How to pass dynamic arguments Airflow operator? Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern) Data science time! April 2019 and salary with experience The Ask Question Wizard is Live!Google Cloud Bigtable Durability/Availability GuaranteesDAGs not clickable on Google Cloud Composer webserver, but working fine on a local AirflowPassing typesafe config conf files to DataProcSparkOperatorAirflow Audit LogsGCP, Composer, Airflow, OperatorsAirflow DAG design for a user centric workflowAirflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?Supplying arguments to MLEngine airflow operatorTriggering tasks on-premises hadoop cluster from cloud composerHttpError 400 when trying to run DataProcSparkOperator task from a local Airflow
What combination of kingdom cards makes for the fewest number of turns to end the game?
Why doesn't the university give past final exams' answers?
Network Switch Upgrade Planning questions
Short story about an alien named Ushtu(?) coming from a future Earth, when ours was destroyed by a nuclear explosion
Combining list in a Cartesian product format with addition operation?
What's the connection between Mr. Nancy and fried chicken?
lm and glm function in R
Does GDPR cover the collection of data by websites that crawl the web and resell user data
Are there any AGPL-style licences that require source code modifications to be public?
How to ask rejected full-time candidates to apply to teach individual courses?
Are Flameskulls resistant to magical piercing damage?
When speaking, how do you change your mind mid-sentence?
Why these surprising proportionalities of integrals involving odd zeta values?
Etymology of 見舞い
Are bags of holding fireproof?
Why isn't everyone flabbergasted about Bran's "gift"?
Who's this lady in the war room?
Can 'non' with gerundive mean both lack of obligation and negative obligation?
How to create a command for the "strange m" symbol in latex?
How can I introduce the names of fantasy creatures to the reader?
Book about a teenager and alien
Does the Pact of the Blade warlock feature allow me to customize the properties of the pact weapon I create?
tabularx column has extra padding at right?
Import keychain to clean macOS install?
How to pass dynamic arguments Airflow operator?
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 23, 2019 at 23:30 UTC (7:30pm US/Eastern)
Data science time! April 2019 and salary with experience
The Ask Question Wizard is Live!Google Cloud Bigtable Durability/Availability GuaranteesDAGs not clickable on Google Cloud Composer webserver, but working fine on a local AirflowPassing typesafe config conf files to DataProcSparkOperatorAirflow Audit LogsGCP, Composer, Airflow, OperatorsAirflow DAG design for a user centric workflowAirflow DAG - how to check BQ first (delete if necessary) and then run dataflow job?Supplying arguments to MLEngine airflow operatorTriggering tasks on-premises hadoop cluster from cloud composerHttpError 400 when trying to run DataProcSparkOperator task from a local Airflow
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I am using Airflow to run Spark jobs on Google Cloud Composer. I need to
- Create cluster (YAML parameters supplied by user)
- list of spark jobs (job params also supplied by per job YAML)
With the Airflow API - I can read YAML files, and push variables across tasks using xcom.
But, consider the DataprocClusterCreateOperator()
cluster_nameproject_idzone
and a few other arguments are marked as templated.
What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,num_workers, worker_machine_type etc?
Is there any workaround for this?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
add a comment |
I am using Airflow to run Spark jobs on Google Cloud Composer. I need to
- Create cluster (YAML parameters supplied by user)
- list of spark jobs (job params also supplied by per job YAML)
With the Airflow API - I can read YAML files, and push variables across tasks using xcom.
But, consider the DataprocClusterCreateOperator()
cluster_nameproject_idzone
and a few other arguments are marked as templated.
What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,num_workers, worker_machine_type etc?
Is there any workaround for this?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
add a comment |
I am using Airflow to run Spark jobs on Google Cloud Composer. I need to
- Create cluster (YAML parameters supplied by user)
- list of spark jobs (job params also supplied by per job YAML)
With the Airflow API - I can read YAML files, and push variables across tasks using xcom.
But, consider the DataprocClusterCreateOperator()
cluster_nameproject_idzone
and a few other arguments are marked as templated.
What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,num_workers, worker_machine_type etc?
Is there any workaround for this?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
I am using Airflow to run Spark jobs on Google Cloud Composer. I need to
- Create cluster (YAML parameters supplied by user)
- list of spark jobs (job params also supplied by per job YAML)
With the Airflow API - I can read YAML files, and push variables across tasks using xcom.
But, consider the DataprocClusterCreateOperator()
cluster_nameproject_idzone
and a few other arguments are marked as templated.
What if I want to pass in other arguments as templated (which are currently not so)? - like image_version,num_workers, worker_machine_type etc?
Is there any workaround for this?
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
google-cloud-platform airflow google-cloud-composer apache-airflow-xcom
edited Mar 23 at 17:55
Manoj Choudhari
2,6122722
2,6122722
asked Mar 22 at 13:15
sudeepgupta90sudeepgupta90
158112
158112
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args =
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55300453%2fhow-to-pass-dynamic-arguments-airflow-operator%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args =
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
add a comment |
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args =
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
add a comment |
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args =
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
Not sure what you mean for 'dynamic', but when yaml file updated, if the reading file process is in dag file body, the dag will be refreshed to apply for the new args from yaml file. So actually, you don't need XCOM to get the arguments.
just simply create a params dictionary then pass to default_args:
CONFIGFILE = os.path.join(
os.path.dirname(os.path.realpath(__file__)), 'your_yaml_file')
with open(CONFIGFILE, 'r') as ymlfile:
CFG = yaml.load(ymlfile)
default_args =
'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
'project_id': CFG['section_A']['project_id'],
'zone': CFG['section_A']['zone'],
'mage_version': CFG['section_A']['image_version'],
'num_workers': CFG['section_A']['num_workers'],
'worker_machine_type': CFG['section_A']['worker_machine_type'],
# you can add all needs params here.
DAG = DAG(
dag_id=DAG_NAME,
schedule_interval=SCHEDULE_INTEVAL,
default_args=default_args, # pass the params to DAG environment
)
Task1 = DataprocClusterCreateOperator(
task_id='your_task_id',
dag=DAG
)
But if you want dynamic dags rather than arguments, you may need other strategy like this.
So you probably need to figure out the basic idea:
In which level the dynamics is? Task level? DAG level?
Or you can create your own Operator to do the job and take the parameters.
edited Mar 22 at 22:11
answered Mar 22 at 21:15
AC at CAAC at CA
33529
33529
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
add a comment |
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
thank you for your answer. reading the yaml file as params dictionary worked for me.
– sudeepgupta90
Mar 25 at 10:12
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55300453%2fhow-to-pass-dynamic-arguments-airflow-operator%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown