Apache Airflow: run all parallel tasks in single DAG runHow to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working
What does 사자 in this picture means?
I2C signal and power over long range (10meter cable)
Stereotypical names
In Star Trek IV, why did the Bounty go back to a time when whales were already rare?
What is the opposite of 'gravitas'?
Do all polymers contain either carbon or silicon?
Golf game boilerplate
Installing PowerShell on 32-bit Kali OS fails
Can a Gentile theist be saved?
Did US corporations pay demonstrators in the German demonstrations against article 13?
The most efficient algorithm to find all possible integer pairs which sum to a given integer
Simple image editor tool to draw a simple box/rectangle in an existing image
Calculating the number of days between 2 dates in Excel
Adding empty element to declared container without declaring type of element
Organic chemistry Iodoform Reaction
Partial sums of primes
What is the term when two people sing in harmony, but they aren't singing the same notes?
Why is delta-v is the most useful quantity for planning space travel?
Is there enough fresh water in the world to eradicate the drinking water crisis?
Is it okay / does it make sense for another player to join a running game of Munchkin?
How do I repair my stair bannister?
How can I raise concerns with a new DM about XP splitting?
How to color a zone in Tikz
How to check participants in at events?
Apache Airflow: run all parallel tasks in single DAG run
How to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working
I have a DAG that has 30 (or more) dynamically created parallel tasks.
I have concurrency
option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.
Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?
According to this FAQ, it seems like it's one of the dag_concurrency
or max_active_runs_per_dag
, but the former seem to be overdriven by concurrency
setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
EDIT 1:
My current airflow.cfg
contains:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
My env variables are as following (set all of them different to easily spot which one helps):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
And with that I have following Gantt diagram:
Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.
parallel-processing task scheduled-tasks airflow
add a comment |
I have a DAG that has 30 (or more) dynamically created parallel tasks.
I have concurrency
option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.
Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?
According to this FAQ, it seems like it's one of the dag_concurrency
or max_active_runs_per_dag
, but the former seem to be overdriven by concurrency
setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
EDIT 1:
My current airflow.cfg
contains:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
My env variables are as following (set all of them different to easily spot which one helps):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
And with that I have following Gantt diagram:
Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.
parallel-processing task scheduled-tasks airflow
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00
add a comment |
I have a DAG that has 30 (or more) dynamically created parallel tasks.
I have concurrency
option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.
Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?
According to this FAQ, it seems like it's one of the dag_concurrency
or max_active_runs_per_dag
, but the former seem to be overdriven by concurrency
setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
EDIT 1:
My current airflow.cfg
contains:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
My env variables are as following (set all of them different to easily spot which one helps):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
And with that I have following Gantt diagram:
Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.
parallel-processing task scheduled-tasks airflow
I have a DAG that has 30 (or more) dynamically created parallel tasks.
I have concurrency
option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.
Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?
According to this FAQ, it seems like it's one of the dag_concurrency
or max_active_runs_per_dag
, but the former seem to be overdriven by concurrency
setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
EDIT 1:
My current airflow.cfg
contains:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
My env variables are as following (set all of them different to easily spot which one helps):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
And with that I have following Gantt diagram:
Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.
parallel-processing task scheduled-tasks airflow
parallel-processing task scheduled-tasks airflow
edited Mar 21 at 15:08
Eduard Sukharev
asked Mar 21 at 11:01
Eduard SukharevEduard Sukharev
5431826
5431826
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00
add a comment |
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00
add a comment |
2 Answers
2
active
oldest
votes
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
It is weird that you were able to run tasks in parallel withSequentialExecutor
. We designedSequentialExecutor
to run tasks serially. If you want to run tasks in parallel useLocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeedLocalExecutor
.
– Eduard Sukharev
Mar 21 at 15:06
add a comment |
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
It is weird that you were able to run tasks in parallel withSequentialExecutor
. We designedSequentialExecutor
to run tasks serially. If you want to run tasks in parallel useLocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeedLocalExecutor
.
– Eduard Sukharev
Mar 21 at 15:06
add a comment |
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
It is weird that you were able to run tasks in parallel withSequentialExecutor
. We designedSequentialExecutor
to run tasks serially. If you want to run tasks in parallel useLocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeedLocalExecutor
.
– Eduard Sukharev
Mar 21 at 15:06
add a comment |
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
Update the concurrency
config as well in your airflow.cfg
file. If it is 16, increase it to 32.
If you are using Celery Executor, change worker_concurrency
to 32.
edited Mar 21 at 16:52
answered Mar 21 at 14:33
kaxilkaxil
3,688928
3,688928
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
It is weird that you were able to run tasks in parallel withSequentialExecutor
. We designedSequentialExecutor
to run tasks serially. If you want to run tasks in parallel useLocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeedLocalExecutor
.
– Eduard Sukharev
Mar 21 at 15:06
add a comment |
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
It is weird that you were able to run tasks in parallel withSequentialExecutor
. We designedSequentialExecutor
to run tasks serially. If you want to run tasks in parallel useLocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeedLocalExecutor
.
– Eduard Sukharev
Mar 21 at 15:06
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.
– Eduard Sukharev
Mar 21 at 14:54
1
1
It is weird that you were able to run tasks in parallel with
SequentialExecutor
. We designed SequentialExecutor
to run tasks serially. If you want to run tasks in parallel use LocalExecutor
– kaxil
Mar 21 at 14:56
It is weird that you were able to run tasks in parallel with
SequentialExecutor
. We designed SequentialExecutor
to run tasks serially. If you want to run tasks in parallel use LocalExecutor
– kaxil
Mar 21 at 14:56
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed
LocalExecutor
.– Eduard Sukharev
Mar 21 at 15:06
Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed
LocalExecutor
.– Eduard Sukharev
Mar 21 at 15:06
add a comment |
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
add a comment |
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
add a comment |
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
The actual parameter to change was dag_concurrency
in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY
env variable.
As per docs I referred to in my question:
concurrency
: The Airflow scheduler will run no more than$concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from thedag_concurrency
entry in your airflow.cfg.
Which means following simplified code:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)
should be rewritten to:
default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)
My code actually has wrong assumption that default_args
at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency
to 1
there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.
answered Mar 21 at 15:18
Eduard SukharevEduard Sukharev
5431826
5431826
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Are those concurrent tasks of type SubDagOperator?
– RyanTheCoder
Mar 21 at 11:06
@RyanTheCoder no, they are just simple tasks, PythonOperator ones.
– Eduard Sukharev
Mar 21 at 12:00