Apache Airflow: run all parallel tasks in single DAG runHow to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working

What does 사자 in this picture means?

I2C signal and power over long range (10meter cable)

Stereotypical names

In Star Trek IV, why did the Bounty go back to a time when whales were already rare?

What is the opposite of 'gravitas'?

Do all polymers contain either carbon or silicon?

Golf game boilerplate

Installing PowerShell on 32-bit Kali OS fails

Can a Gentile theist be saved?

Did US corporations pay demonstrators in the German demonstrations against article 13?

The most efficient algorithm to find all possible integer pairs which sum to a given integer

Simple image editor tool to draw a simple box/rectangle in an existing image

Calculating the number of days between 2 dates in Excel

Adding empty element to declared container without declaring type of element

Organic chemistry Iodoform Reaction

Partial sums of primes

What is the term when two people sing in harmony, but they aren't singing the same notes?

Why is delta-v is the most useful quantity for planning space travel?

Is there enough fresh water in the world to eradicate the drinking water crisis?

Is it okay / does it make sense for another player to join a running game of Munchkin?

How do I repair my stair bannister?

How can I raise concerns with a new DM about XP splitting?

How to color a zone in Tikz

How to check participants in at events?



Apache Airflow: run all parallel tasks in single DAG run


How to prevent airflow from backfilling dag runs?Change execution concurrency of Airflow DAGAirflow Docker Deployment: Task Not Getting Run After start_date + schedule_intervalAirflow DAG task not found after updating adding a taskHow to force jinja templating on airflow variable?Running Airflow Tasks In Parallel - Nothing Gets ScheduledTest Dag run for Airflow 1.9 in unittestAirflow “concurrency” parameter in the dag not consistentRunning more than 32 concurrent tasks in Apache AirflowApache airflow: setting catchup to False is not working













0















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
























  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00















0















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
























  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00













0












0








0








I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.










share|improve this question
















I have a DAG that has 30 (or more) dynamically created parallel tasks.



I have concurrency option set on that DAG so that I only have single DAG Run running, when catching up the history.
When I run it on my server only 16 tasks actually run in parallel, while the rest 14 just wait being queued.



Which setting should I alter so that I have only 1 DAG Run running, but with all 30+ tasks running in parallel?



According to this FAQ, it seems like it's one of the dag_concurrency or max_active_runs_per_dag, but the former seem to be overdriven by concurrency setting already, while the latter seemed to have no effect (or I effectively messed up my setup).
Here's the sample code:



import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args =
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,



def print_operators(ds, **kwargs):
logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params='schema': config.SCHEMA_DB,
max_active_runs=1,
)

print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_i',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end



EDIT 1:
My current airflow.cfg contains:



executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26


My env variables are as following (set all of them different to easily spot which one helps):



AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22


And with that I have following Gantt diagram:
enter image description here



Which kind of gives me a hint that setting DAG_CONCURRENCY env variable works.







parallel-processing task scheduled-tasks airflow






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 21 at 15:08







Eduard Sukharev

















asked Mar 21 at 11:01









Eduard SukharevEduard Sukharev

5431826




5431826












  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00

















  • Are those concurrent tasks of type SubDagOperator?

    – RyanTheCoder
    Mar 21 at 11:06











  • @RyanTheCoder no, they are just simple tasks, PythonOperator ones.

    – Eduard Sukharev
    Mar 21 at 12:00
















Are those concurrent tasks of type SubDagOperator?

– RyanTheCoder
Mar 21 at 11:06





Are those concurrent tasks of type SubDagOperator?

– RyanTheCoder
Mar 21 at 11:06













@RyanTheCoder no, they are just simple tasks, PythonOperator ones.

– Eduard Sukharev
Mar 21 at 12:00





@RyanTheCoder no, they are just simple tasks, PythonOperator ones.

– Eduard Sukharev
Mar 21 at 12:00












2 Answers
2






active

oldest

votes


















1














Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



If you are using Celery Executor, change worker_concurrency to 32.






share|improve this answer

























  • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54






  • 1





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56











  • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06


















0














The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



As per docs I referred to in my question:




concurrency: The Airflow scheduler will run no more than $concurrency
task instances for your DAG at any given time. Concurrency is defined
in your Airflow DAG. If you do not set the concurrency on your DAG,
the scheduler will use the default value from the dag_concurrency
entry in your airflow.cfg.




Which means following simplified code:



default_args = 
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,



dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
)


should be rewritten to:



default_args = 
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,



dag = DAG(
dag_id='test_parallelism_dag',
default_args=default_args,
max_active_runs=1,
concurrency=30
)


My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    2 Answers
    2






    active

    oldest

    votes








    2 Answers
    2






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer

























    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06















    1














    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer

























    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06













    1












    1








    1







    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.






    share|improve this answer















    Update the concurrency config as well in your airflow.cfg file. If it is 16, increase it to 32.



    If you are using Celery Executor, change worker_concurrency to 32.







    share|improve this answer














    share|improve this answer



    share|improve this answer








    edited Mar 21 at 16:52

























    answered Mar 21 at 14:33









    kaxilkaxil

    3,688928




    3,688928












    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06

















    • I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

      – Eduard Sukharev
      Mar 21 at 14:54






    • 1





      It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

      – kaxil
      Mar 21 at 14:56











    • Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

      – Eduard Sukharev
      Mar 21 at 15:06
















    I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54





    I updated my question. I use SequentialExecutor and parallelism's default was 32, so that parameter is out of equation.

    – Eduard Sukharev
    Mar 21 at 14:54




    1




    1





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56





    It is weird that you were able to run tasks in parallel with SequentialExecutor. We designed SequentialExecutor to run tasks serially. If you want to run tasks in parallel use LocalExecutor

    – kaxil
    Mar 21 at 14:56













    Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06





    Right you are. We have default config value which is overriden by env setting, which I overlooked. Thank you for pointing this out. It is indeed LocalExecutor.

    – Eduard Sukharev
    Mar 21 at 15:06













    0














    The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



    As per docs I referred to in my question:




    concurrency: The Airflow scheduler will run no more than $concurrency
    task instances for your DAG at any given time. Concurrency is defined
    in your Airflow DAG. If you do not set the concurrency on your DAG,
    the scheduler will use the default value from the dag_concurrency
    entry in your airflow.cfg.




    Which means following simplified code:



    default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,



    dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    )


    should be rewritten to:



    default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,



    dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
    )


    My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






    share|improve this answer



























      0














      The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



      As per docs I referred to in my question:




      concurrency: The Airflow scheduler will run no more than $concurrency
      task instances for your DAG at any given time. Concurrency is defined
      in your Airflow DAG. If you do not set the concurrency on your DAG,
      the scheduler will use the default value from the dag_concurrency
      entry in your airflow.cfg.




      Which means following simplified code:



      default_args = 
      'owner': 'airflow',
      'depends_on_past': True,
      'wait_for_downstream': True,
      'concurrency': 1,



      dag = DAG(
      dag_id='test_parallelism_dag',
      default_args=default_args,
      max_active_runs=1,
      )


      should be rewritten to:



      default_args = 
      'owner': 'airflow',
      'depends_on_past': True,
      'wait_for_downstream': True,



      dag = DAG(
      dag_id='test_parallelism_dag',
      default_args=default_args,
      max_active_runs=1,
      concurrency=30
      )


      My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






      share|improve this answer

























        0












        0








        0







        The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



        As per docs I referred to in my question:




        concurrency: The Airflow scheduler will run no more than $concurrency
        task instances for your DAG at any given time. Concurrency is defined
        in your Airflow DAG. If you do not set the concurrency on your DAG,
        the scheduler will use the default value from the dag_concurrency
        entry in your airflow.cfg.




        Which means following simplified code:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,
        'concurrency': 1,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        )


        should be rewritten to:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        concurrency=30
        )


        My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.






        share|improve this answer













        The actual parameter to change was dag_concurrency in airflow.cfg or override it with AIRFLOW__CORE__DAG_CONCURRENCY env variable.



        As per docs I referred to in my question:




        concurrency: The Airflow scheduler will run no more than $concurrency
        task instances for your DAG at any given time. Concurrency is defined
        in your Airflow DAG. If you do not set the concurrency on your DAG,
        the scheduler will use the default value from the dag_concurrency
        entry in your airflow.cfg.




        Which means following simplified code:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,
        'concurrency': 1,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        )


        should be rewritten to:



        default_args = 
        'owner': 'airflow',
        'depends_on_past': True,
        'wait_for_downstream': True,



        dag = DAG(
        dag_id='test_parallelism_dag',
        default_args=default_args,
        max_active_runs=1,
        concurrency=30
        )


        My code actually has wrong assumption that default_args at some point substitute actual kwargs to DAG constructor. I don't know what lead me to that conclusion back then, but I guess setting concurrency to 1 there is some draft leftover, which never actually affected anything and actual DAG concurrency was set from config default, which is 16.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Mar 21 at 15:18









        Eduard SukharevEduard Sukharev

        5431826




        5431826



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55278947%2fapache-airflow-run-all-parallel-tasks-in-single-dag-run%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

            SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

            은진 송씨 목차 역사 본관 분파 인물 조선 왕실과의 인척 관계 집성촌 항렬자 인구 같이 보기 각주 둘러보기 메뉴은진 송씨세종실록 149권, 지리지 충청도 공주목 은진현