How to trigger tasks conditionally in Airflow
I have 3 tasks, and I want to trigger [t1, t2] >> t3
in condition A, and t2 >> t3
for condition B, i.e. skip t1
conditionally.
Solution
-
Add params in DAG:
with DAG( params={ 'skipTask1': 'false', }, ) as dag:
-
Add a branching task as we cannot check params dynamically in DAGs declaration. And set
trigger_rule
tonone_failed
ornone_failed_min_one_success
for taskt3
, otherwiset3
will also be skipped as it only be triggered when all upstream tasks insuccess
state by default. ForNONE_FAILED
, the task will be triggered if all upstream tasks not infailed
state (i.e.success
orskipped
).from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( params={ 'skipTask1': 'false', }, ) as dag: t1 = DummyOperator(task_id='t1') t2 = DummyOperator(task_id='t2') t3 = DummyOperator(task_id='t3', trigger_rule=TriggerRule.NONE_FAILED) def branch_func(**kwargs): if kwargs['params']['skipTask1'] == 'false': return ['t1', 't2', 't3'] else: return ['t2', 't3'] branching = BranchPythonOperator( task_id='branching', python_callable=branch_func, dag=dag) branching >> [t1, t2] >> t3 branching >> t2 >> t3
Refer to https://airflow.apache.org/docs/apache-airflow/2.9.1/core-concepts/dags.html
Comments