Can't add task to the function that returns pipeline (Airflow, DAG)How do I return multiple values from a function?Airflow dynamic DAG and Task IdsAirflow: how to delete a DAG?Dynamic task generation in an Airflow DAGStatus of Airflow task within the dagAirflow not loading dags in /usr/local/airflow/dagsHow to add manual tasks in an Apache Airflow DagHow to validate airflow DAG with customer operator?Add downstream task to every task without downstream in a DAG in Airflow 1.9Running airflow tasks/dags in parallel

Is there a scenario where a gnoll flesh gnawer can move at least 45 feet during its Rampage bonus action?

How to deal with my team leader who keeps calling me about project updates even though I am on leave for personal reasons?

What are the benefits and disadvantages if a creature has multiple tails, e.g., Kyuubi or Nekomata?

Is this a Sherman, and if so what model?

Was there a trial by combat between a man and a dog in medieval France?

Would Taiwan and China's dispute be solved if Taiwan gave up being the Republic of China?

Is it really necessary to have a four hour meeting in Sprint planning?

Is there any reason nowadays to use a neon indicator lamp instead of an LED?

What is the meaning of word 'crack' in chapter 33 of A Game of Thrones?

Can this word order be rearranged?

How can I repair this gas leak on my new range? Teflon tape isn't working

I reverse the source code, you negate the output!

An Algorithm Which Schedules Your Life

How do I deal with too many NPCs in my campaign?

Where Does VDD+0.3V Input Limit Come From on IC chips?

What is the lowest voltage that a microcontroller can successfully read on the analog pin?

How can an attacker use robots.txt?

Do all creatures have souls?

Can the U.S. president make military decisions without consulting anyone?

How to ask a man to not take up more than one seat on public transport while avoiding conflict?

Hiking with a mule or two?

As an employer, can I compel my employees to vote?

Leaving a job that I just took based on false promise of a raise. What do I tell future interviewers?

Is it more effective to add yeast before or after kneading?



Can't add task to the function that returns pipeline (Airflow, DAG)


How do I return multiple values from a function?Airflow dynamic DAG and Task IdsAirflow: how to delete a DAG?Dynamic task generation in an Airflow DAGStatus of Airflow task within the dagAirflow not loading dags in /usr/local/airflow/dagsHow to add manual tasks in an Apache Airflow DagHow to validate airflow DAG with customer operator?Add downstream task to every task without downstream in a DAG in Airflow 1.9Running airflow tasks/dags in parallel






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















I have a function which we mainly use to add to the end of each DAG. Let's call it finalize and it looks like the following Common class (of course, in reality it's doing useful stuff). And what we usually do is almost for each DAG to the end we add the code like



task_1 >> task_2 >> ... task_n >> common.finalize



and as a result, we get



task_1 >> task_2 >> ... task_n >> a >> b >> c.



So far all is good. However, now for one of the DAGs I want to add a task after the finalize. I don't want to touch the finalize function and the code



task_1 >> task_2 >> ... task_n >> common.finalize >> task_new



doesn't help me, because the task_new starts immediately after a. But I want the task_new to be executed after c.
Any suggestions?
Thanks in advance.



class Common(object):
def __init__(self, dag):
self.dag = dag

@property
def finalize(self):
a = BashOperator(
task_id='echo_dag',
bash_command='echo "dag"',
dag=self.dag)

b = BashOperator(
task_id='echo_has_completed',
bash_command='echo "has completed"',
dag=self.dag)

c = BashOperator(
task_id='echo_successfully',
bash_command='echo "successfully"',
dag=self.dag)

a >> b >> c
return a









share|improve this question






























    0















    I have a function which we mainly use to add to the end of each DAG. Let's call it finalize and it looks like the following Common class (of course, in reality it's doing useful stuff). And what we usually do is almost for each DAG to the end we add the code like



    task_1 >> task_2 >> ... task_n >> common.finalize



    and as a result, we get



    task_1 >> task_2 >> ... task_n >> a >> b >> c.



    So far all is good. However, now for one of the DAGs I want to add a task after the finalize. I don't want to touch the finalize function and the code



    task_1 >> task_2 >> ... task_n >> common.finalize >> task_new



    doesn't help me, because the task_new starts immediately after a. But I want the task_new to be executed after c.
    Any suggestions?
    Thanks in advance.



    class Common(object):
    def __init__(self, dag):
    self.dag = dag

    @property
    def finalize(self):
    a = BashOperator(
    task_id='echo_dag',
    bash_command='echo "dag"',
    dag=self.dag)

    b = BashOperator(
    task_id='echo_has_completed',
    bash_command='echo "has completed"',
    dag=self.dag)

    c = BashOperator(
    task_id='echo_successfully',
    bash_command='echo "successfully"',
    dag=self.dag)

    a >> b >> c
    return a









    share|improve this question


























      0












      0








      0








      I have a function which we mainly use to add to the end of each DAG. Let's call it finalize and it looks like the following Common class (of course, in reality it's doing useful stuff). And what we usually do is almost for each DAG to the end we add the code like



      task_1 >> task_2 >> ... task_n >> common.finalize



      and as a result, we get



      task_1 >> task_2 >> ... task_n >> a >> b >> c.



      So far all is good. However, now for one of the DAGs I want to add a task after the finalize. I don't want to touch the finalize function and the code



      task_1 >> task_2 >> ... task_n >> common.finalize >> task_new



      doesn't help me, because the task_new starts immediately after a. But I want the task_new to be executed after c.
      Any suggestions?
      Thanks in advance.



      class Common(object):
      def __init__(self, dag):
      self.dag = dag

      @property
      def finalize(self):
      a = BashOperator(
      task_id='echo_dag',
      bash_command='echo "dag"',
      dag=self.dag)

      b = BashOperator(
      task_id='echo_has_completed',
      bash_command='echo "has completed"',
      dag=self.dag)

      c = BashOperator(
      task_id='echo_successfully',
      bash_command='echo "successfully"',
      dag=self.dag)

      a >> b >> c
      return a









      share|improve this question














      I have a function which we mainly use to add to the end of each DAG. Let's call it finalize and it looks like the following Common class (of course, in reality it's doing useful stuff). And what we usually do is almost for each DAG to the end we add the code like



      task_1 >> task_2 >> ... task_n >> common.finalize



      and as a result, we get



      task_1 >> task_2 >> ... task_n >> a >> b >> c.



      So far all is good. However, now for one of the DAGs I want to add a task after the finalize. I don't want to touch the finalize function and the code



      task_1 >> task_2 >> ... task_n >> common.finalize >> task_new



      doesn't help me, because the task_new starts immediately after a. But I want the task_new to be executed after c.
      Any suggestions?
      Thanks in advance.



      class Common(object):
      def __init__(self, dag):
      self.dag = dag

      @property
      def finalize(self):
      a = BashOperator(
      task_id='echo_dag',
      bash_command='echo "dag"',
      dag=self.dag)

      b = BashOperator(
      task_id='echo_has_completed',
      bash_command='echo "has completed"',
      dag=self.dag)

      c = BashOperator(
      task_id='echo_successfully',
      bash_command='echo "successfully"',
      dag=self.dag)

      a >> b >> c
      return a






      python airflow






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Mar 28 at 15:34









      sayasaya

      32 bronze badges




      32 bronze badges

























          1 Answer
          1






          active

          oldest

          votes


















          0
















          Perhaps I'm missing something, but it sounds like you could set task_new to have a trigger_rule value of all_success to ensure that all upstream tasks have completed successfully before it's triggered, and then orchestrate the tasks like this:



          [task_1, task_2, task_n, common.finalize] >> [task_new]






          share|improve this answer

























          • Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

            – saya
            Mar 29 at 8:31












          • Does removing return a from the finalize() function fix the issue?

            – fez
            Mar 29 at 10:00













          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/4.0/"u003ecc by-sa 4.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );














          draft saved

          draft discarded
















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401518%2fcant-add-task-to-the-function-that-returns-pipeline-airflow-dag%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0
















          Perhaps I'm missing something, but it sounds like you could set task_new to have a trigger_rule value of all_success to ensure that all upstream tasks have completed successfully before it's triggered, and then orchestrate the tasks like this:



          [task_1, task_2, task_n, common.finalize] >> [task_new]






          share|improve this answer

























          • Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

            – saya
            Mar 29 at 8:31












          • Does removing return a from the finalize() function fix the issue?

            – fez
            Mar 29 at 10:00















          0
















          Perhaps I'm missing something, but it sounds like you could set task_new to have a trigger_rule value of all_success to ensure that all upstream tasks have completed successfully before it's triggered, and then orchestrate the tasks like this:



          [task_1, task_2, task_n, common.finalize] >> [task_new]






          share|improve this answer

























          • Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

            – saya
            Mar 29 at 8:31












          • Does removing return a from the finalize() function fix the issue?

            – fez
            Mar 29 at 10:00













          0














          0










          0









          Perhaps I'm missing something, but it sounds like you could set task_new to have a trigger_rule value of all_success to ensure that all upstream tasks have completed successfully before it's triggered, and then orchestrate the tasks like this:



          [task_1, task_2, task_n, common.finalize] >> [task_new]






          share|improve this answer













          Perhaps I'm missing something, but it sounds like you could set task_new to have a trigger_rule value of all_success to ensure that all upstream tasks have completed successfully before it's triggered, and then orchestrate the tasks like this:



          [task_1, task_2, task_n, common.finalize] >> [task_new]







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 28 at 16:30









          fezfez

          94313 silver badges24 bronze badges




          94313 silver badges24 bronze badges















          • Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

            – saya
            Mar 29 at 8:31












          • Does removing return a from the finalize() function fix the issue?

            – fez
            Mar 29 at 10:00

















          • Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

            – saya
            Mar 29 at 8:31












          • Does removing return a from the finalize() function fix the issue?

            – fez
            Mar 29 at 10:00
















          Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

          – saya
          Mar 29 at 8:31






          Sorry for not clear description. But it's not trigger_rule. The graph that's built is not correct. Because the function returns a which is the first task. By the way, even though it returns only a, the graph shows all 3 tasks (a >> b >> c) and airflow runs them as well. However, when I add the new task after the function it starts immediately after the task a, but I want it to be started after c which is the last task without touching the function.

          – saya
          Mar 29 at 8:31














          Does removing return a from the finalize() function fix the issue?

          – fez
          Mar 29 at 10:00





          Does removing return a from the finalize() function fix the issue?

          – fez
          Mar 29 at 10:00




















          draft saved

          draft discarded















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401518%2fcant-add-task-to-the-function-that-returns-pipeline-airflow-dag%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

          Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

          Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript