Apache Airflow: run all parallel tasks in single DAG runHow to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working

What does 사자 in this picture means?

I2C signal and power over long range (10meter cable)

Stereotypical names

In Star Trek IV, why did the Bounty go back to a time when whales were already rare?

What is the opposite of 'gravitas'?

Do all polymers contain either carbon or silicon?

Golf game boilerplate

Installing PowerShell on 32-bit Kali OS fails

Can a Gentile theist be saved?

Did US corporations pay demonstrators in the German demonstrations against article 13?

The most efficient algorithm to find all possible integer pairs which sum to a given integer

Simple image editor tool to draw a simple box/rectangle in an existing image

Calculating the number of days between 2 dates in Excel

Adding empty element to declared container without declaring type of element

Organic chemistry Iodoform Reaction

Partial sums of primes

What is the term when two people sing in harmony, but they aren't singing the same notes?

Why is delta-v is the most useful quantity for planning space travel?

Is there enough fresh water in the world to eradicate the drinking water crisis?

Is it okay / does it make sense for another player to join a running game of Munchkin?

How do I repair my stair bannister?

How can I raise concerns with a new DM about XP splitting?

How to color a zone in Tikz

How to check participants in at events?



Apache Airflow: run all parallel tasks in single DAG run


How to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working













0















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
























  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00















0















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
























  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00













0












0








0








I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.







parallel-processing task scheduled-tasks airflow






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 21 at 15:08







Eduard Sukharev

















asked Mar 21 at 11:01









Eduard SukharevEduard Sukharev

5431826




5431826












  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00

















  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00
















Are those concurrent tasks of type SubDagOperator?

– RyanTheCoder
Mar 21 at 11:06





Are those concurrent tasks of type SubDagOperator?

– RyanTheCoder
Mar 21 at 11:06













@RyanTheCoder no, they are just simple tasks, PythonOperator ones.

– Eduard Sukharev
Mar 21 at 12:00





@RyanTheCoder no, they are just simple tasks, PythonOperator ones.

– Eduard Sukharev
Mar 21 at 12:00












2 Answers
2






active

oldest

votes


















1














Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



If you are using Celery Executor, change worker_concurrency to 32.






share|improve this answer

























  • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54






  • 1





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56











  • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06


















0














The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



As per docs I referred to in my question:




concurrency: The Airflow scheduler will run no more than $concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from the dag_concurrency
entry in your airflow.cfg.




Which means following simplified code:



default_args = 
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,



dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)


should be rewritten to:



default_args = 
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,



dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)


My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer

























    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06















    1














    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer

























    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06













    1












    1








    1







    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer















    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.







    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Mar 21 at 16:52

























    answered Mar 21 at 14:33









    kaxilkaxil

    3,688928




    3,688928












    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06

















    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06
















    I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54





    I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54




    1




    1





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56













    Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06





    Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06













    0














    The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



    As per docs I referred to in my question:




    concurrency: The Airflow scheduler will run no more than $concurrency
    task instances for your DAG at any given time. Concurrency is defined
    in your Airflow DAG. If you do not set the concurrency on your DAG,
    the scheduler will use the default value from the dag_concurrency
    entry in your airflow.cfg.




    Which means following simplified code:



    default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,



    dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    )


    should be rewritten to:



    default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,



    dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
    )


    My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






    share|improve this answer



























      0














      The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



      As per docs I referred to in my question:




      concurrency: The Airflow scheduler will run no more than $concurrency
      task instances for your DAG at any given time. Concurrency is defined
      in your Airflow DAG. If you do not set the concurrency on your DAG,
      the scheduler will use the default value from the dag_concurrency
      entry in your airflow.cfg.




      Which means following simplified code:



      default_args = 
      'owner': 'airflow',
      'depends_on_past': True,
      'wait_for_downstream': True,
      'concurrency': 1,



      dag = DAG(
      dag_id='test_parallelism_dag',
      default_args=default_args,
      max_active_runs=1,
      )


      should be rewritten to:



      default_args = 
      'owner': 'airflow',
      'depends_on_past': True,
      'wait_for_downstream': True,



      dag = DAG(
      dag_id='test_parallelism_dag',
      default_args=default_args,
      max_active_runs=1,
      concurrency=30
      )


      My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






      share|improve this answer

























        0












        0








        0







        The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



        As per docs I referred to in my question:




        concurrency: The Airflow scheduler will run no more than $concurrency
        task instances for your DAG at any given time. Concurrency is defined
        in your Airflow DAG. If you do not set the concurrency on your DAG,
        the scheduler will use the default value from the dag_concurrency
        entry in your airflow.cfg.




        Which means following simplified code:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,
        'concurrency': 1,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        )


        should be rewritten to:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        concurrency=30
        )


        My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






        share|improve this answer













        The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



        As per docs I referred to in my question:




        concurrency: The Airflow scheduler will run no more than $concurrency
        task instances for your DAG at any given time. Concurrency is defined
        in your Airflow DAG. If you do not set the concurrency on your DAG,
        the scheduler will use the default value from the dag_concurrency
        entry in your airflow.cfg.




        Which means following simplified code:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,
        'concurrency': 1,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        )


        should be rewritten to:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        concurrency=30
        )


        My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Mar 21 at 15:18









        Eduard SukharevEduard Sukharev

        5431826




        5431826



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

            Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

            Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript