How to pass dynamic arguments Airflow operator? Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Google Cloud Bigtable Durability/Availability GuaranteesDAGs not clickable on Google Cloud Composer webserver, but working fine on a local AirflowPassing typesafe config conf files to DataProcSparkOperatorAirflow Audit LogsGCP, Composer, Airflow, OperatorsAirflow DAG design for a user centric workflowAirflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?Supplying arguments to MLEngine airflow operatorTriggering tasks on-premises hadoop cluster from cloud composerHttpError 400 when trying to run DataProcSparkOperator task from a local Airflow

What combination of kingdom cards makes for the fewest number of turns to end the game?

Why doesn't the university give past final exams' answers?

Network Switch Upgrade Planning questions

Short story about an alien named Ushtu(?) coming from a future Earth, when ours was destroyed by a nuclear explosion

Combining list in a Cartesian product format with addition operation?

What's the connection between Mr. Nancy and fried chicken?

lm and glm function in R

Does GDPR cover the collection of data by websites that crawl the web and resell user data

Are there any AGPL-style licences that require source code modifications to be public?

How to ask rejected full-time candidates to apply to teach individual courses?

Are Flameskulls resistant to magical piercing damage?

When speaking, how do you change your mind mid-sentence?

Why these surprising proportionalities of integrals involving odd zeta values?

Etymology of 見舞い

Are bags of holding fireproof?

Why isn't everyone flabbergasted about Bran's "gift"?

Who's this lady in the war room?

Can 'non' with gerundive mean both lack of obligation and negative obligation?

How to create a command for the "strange m" symbol in latex?

How can I introduce the names of fantasy creatures to the reader?

Book about a teenager and alien

Does the Pact of the Blade warlock feature allow me to customize the properties of the pact weapon I create?

tabularx column has extra padding at right?

Import keychain to clean macOS install?



How to pass dynamic arguments Airflow operator?



Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Google Cloud Bigtable Durability/Availability GuaranteesDAGs not clickable on Google Cloud Composer webserver, but working fine on a local AirflowPassing typesafe config conf files to DataProcSparkOperatorAirflow Audit LogsGCP, Composer, Airflow, OperatorsAirflow DAG design for a user centric workflowAirflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?Supplying arguments to MLEngine airflow operatorTriggering tasks on-premises hadoop cluster from cloud composerHttpError 400 when trying to run DataProcSparkOperator task from a local Airflow



.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








3















I am using Airflow to run Spark jobs on Google Cloud Composer. I need to



  • Create cluster (YAML parameters supplied by user)

  • list of spark jobs (job params also supplied by per job YAML)

With the Airflow API - I can read YAML files, and push variables across tasks using xcom.



But, consider the DataprocClusterCreateOperator()



  • cluster_name

  • project_id

  • zone

and a few other arguments are marked as templated.



What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,
num_workers, worker_machine_type etc?



Is there any workaround for this?










share|improve this question






























    3















    I am using Airflow to run Spark jobs on Google Cloud Composer. I need to



    • Create cluster (YAML parameters supplied by user)

    • list of spark jobs (job params also supplied by per job YAML)

    With the Airflow API - I can read YAML files, and push variables across tasks using xcom.



    But, consider the DataprocClusterCreateOperator()



    • cluster_name

    • project_id

    • zone

    and a few other arguments are marked as templated.



    What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,
    num_workers, worker_machine_type etc?



    Is there any workaround for this?










    share|improve this question


























      3












      3








      3


      1






      I am using Airflow to run Spark jobs on Google Cloud Composer. I need to



      • Create cluster (YAML parameters supplied by user)

      • list of spark jobs (job params also supplied by per job YAML)

      With the Airflow API - I can read YAML files, and push variables across tasks using xcom.



      But, consider the DataprocClusterCreateOperator()



      • cluster_name

      • project_id

      • zone

      and a few other arguments are marked as templated.



      What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,
      num_workers, worker_machine_type etc?



      Is there any workaround for this?










      share|improve this question
















      I am using Airflow to run Spark jobs on Google Cloud Composer. I need to



      • Create cluster (YAML parameters supplied by user)

      • list of spark jobs (job params also supplied by per job YAML)

      With the Airflow API - I can read YAML files, and push variables across tasks using xcom.



      But, consider the DataprocClusterCreateOperator()



      • cluster_name

      • project_id

      • zone

      and a few other arguments are marked as templated.



      What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,
      num_workers, worker_machine_type etc?



      Is there any workaround for this?







      google-cloud-platform airflow google-cloud-composer apache-airflow-xcom






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 23 at 17:55









      Manoj Choudhari

      2,6122722




      2,6122722










      asked Mar 22 at 13:15









      sudeepgupta90sudeepgupta90

      158112




      158112






















          1 Answer
          1






          active

          oldest

          votes


















          4














          Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
          just simply create a params dictionary then pass to default_args:



          CONFIGFILE = os.path.join(
          os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')

          with open(CONFIGFILE, 'r') as ymlfile:
          CFG = yaml.load(ymlfile)

          default_args =
          'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
          'project_id': CFG['section_A']['project_id'],
          'zone': CFG['section_A']['zone'],
          'mage_version': CFG['section_A']['image_version'],
          'num_workers': CFG['section_A']['num_workers'],
          'worker_machine_type': CFG['section_A']['worker_machine_type'],
          # you can add all needs params here.


          DAG = DAG(
          dag_id=DAG_NAME,
          schedule_interval=SCHEDULE_INTEVAL,
          default_args=default_args, # pass the params to DAG environment
          )

          Task1 = DataprocClusterCreateOperator(
          task_id='your_task_id',
          dag=DAG
          )


          But if you want dynamic dags rather than arguments, you may need other strategy like this.



          So you probably need to figure out the basic idea:
          In which level the dynamics is? Task level? DAG level?



          Or you can create your own Operator to do the job and take the parameters.






          share|improve this answer

























          • thank you for your answer. reading the yaml file as params dictionary worked for me.

            – sudeepgupta90
            Mar 25 at 10:12











          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55300453%2fhow-to-pass-dynamic-arguments-airflow-operator%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          4














          Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
          just simply create a params dictionary then pass to default_args:



          CONFIGFILE = os.path.join(
          os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')

          with open(CONFIGFILE, 'r') as ymlfile:
          CFG = yaml.load(ymlfile)

          default_args =
          'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
          'project_id': CFG['section_A']['project_id'],
          'zone': CFG['section_A']['zone'],
          'mage_version': CFG['section_A']['image_version'],
          'num_workers': CFG['section_A']['num_workers'],
          'worker_machine_type': CFG['section_A']['worker_machine_type'],
          # you can add all needs params here.


          DAG = DAG(
          dag_id=DAG_NAME,
          schedule_interval=SCHEDULE_INTEVAL,
          default_args=default_args, # pass the params to DAG environment
          )

          Task1 = DataprocClusterCreateOperator(
          task_id='your_task_id',
          dag=DAG
          )


          But if you want dynamic dags rather than arguments, you may need other strategy like this.



          So you probably need to figure out the basic idea:
          In which level the dynamics is? Task level? DAG level?



          Or you can create your own Operator to do the job and take the parameters.






          share|improve this answer

























          • thank you for your answer. reading the yaml file as params dictionary worked for me.

            – sudeepgupta90
            Mar 25 at 10:12















          4














          Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
          just simply create a params dictionary then pass to default_args:



          CONFIGFILE = os.path.join(
          os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')

          with open(CONFIGFILE, 'r') as ymlfile:
          CFG = yaml.load(ymlfile)

          default_args =
          'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
          'project_id': CFG['section_A']['project_id'],
          'zone': CFG['section_A']['zone'],
          'mage_version': CFG['section_A']['image_version'],
          'num_workers': CFG['section_A']['num_workers'],
          'worker_machine_type': CFG['section_A']['worker_machine_type'],
          # you can add all needs params here.


          DAG = DAG(
          dag_id=DAG_NAME,
          schedule_interval=SCHEDULE_INTEVAL,
          default_args=default_args, # pass the params to DAG environment
          )

          Task1 = DataprocClusterCreateOperator(
          task_id='your_task_id',
          dag=DAG
          )


          But if you want dynamic dags rather than arguments, you may need other strategy like this.



          So you probably need to figure out the basic idea:
          In which level the dynamics is? Task level? DAG level?



          Or you can create your own Operator to do the job and take the parameters.






          share|improve this answer

























          • thank you for your answer. reading the yaml file as params dictionary worked for me.

            – sudeepgupta90
            Mar 25 at 10:12













          4












          4








          4







          Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
          just simply create a params dictionary then pass to default_args:



          CONFIGFILE = os.path.join(
          os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')

          with open(CONFIGFILE, 'r') as ymlfile:
          CFG = yaml.load(ymlfile)

          default_args =
          'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
          'project_id': CFG['section_A']['project_id'],
          'zone': CFG['section_A']['zone'],
          'mage_version': CFG['section_A']['image_version'],
          'num_workers': CFG['section_A']['num_workers'],
          'worker_machine_type': CFG['section_A']['worker_machine_type'],
          # you can add all needs params here.


          DAG = DAG(
          dag_id=DAG_NAME,
          schedule_interval=SCHEDULE_INTEVAL,
          default_args=default_args, # pass the params to DAG environment
          )

          Task1 = DataprocClusterCreateOperator(
          task_id='your_task_id',
          dag=DAG
          )


          But if you want dynamic dags rather than arguments, you may need other strategy like this.



          So you probably need to figure out the basic idea:
          In which level the dynamics is? Task level? DAG level?



          Or you can create your own Operator to do the job and take the parameters.






          share|improve this answer















          Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
          just simply create a params dictionary then pass to default_args:



          CONFIGFILE = os.path.join(
          os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')

          with open(CONFIGFILE, 'r') as ymlfile:
          CFG = yaml.load(ymlfile)

          default_args =
          'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
          'project_id': CFG['section_A']['project_id'],
          'zone': CFG['section_A']['zone'],
          'mage_version': CFG['section_A']['image_version'],
          'num_workers': CFG['section_A']['num_workers'],
          'worker_machine_type': CFG['section_A']['worker_machine_type'],
          # you can add all needs params here.


          DAG = DAG(
          dag_id=DAG_NAME,
          schedule_interval=SCHEDULE_INTEVAL,
          default_args=default_args, # pass the params to DAG environment
          )

          Task1 = DataprocClusterCreateOperator(
          task_id='your_task_id',
          dag=DAG
          )


          But if you want dynamic dags rather than arguments, you may need other strategy like this.



          So you probably need to figure out the basic idea:
          In which level the dynamics is? Task level? DAG level?



          Or you can create your own Operator to do the job and take the parameters.







          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Mar 22 at 22:11

























          answered Mar 22 at 21:15









          AC at CAAC at CA

          33529




          33529












          • thank you for your answer. reading the yaml file as params dictionary worked for me.

            – sudeepgupta90
            Mar 25 at 10:12

















          • thank you for your answer. reading the yaml file as params dictionary worked for me.

            – sudeepgupta90
            Mar 25 at 10:12
















          thank you for your answer. reading the yaml file as params dictionary worked for me.

          – sudeepgupta90
          Mar 25 at 10:12





          thank you for your answer. reading the yaml file as params dictionary worked for me.

          – sudeepgupta90
          Mar 25 at 10:12



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55300453%2fhow-to-pass-dynamic-arguments-airflow-operator%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

          용인 삼성생명 블루밍스 목차 통계 역대 감독 선수단 응원단 경기장 같이 보기 외부 링크 둘러보기 메뉴samsungblueminx.comeh선수 명단용인 삼성생명 블루밍스용인 삼성생명 블루밍스ehsamsungblueminx.comeheheheh

          155 수학 과학 기타 둘러보기 메뉴eh추가해eh문서를 완성해