How to trigger tasks conditionally in Airflow
I have 3 tasks, and I want to trigger [t1, t2] >> t3 in condition A, and t2 >> t3 for condition B, i.e. skip t1 conditionally.
Solution
-
Add params in DAG:
with DAG( params={ 'skipTask1': 'false', }, ) as dag: -
Add a branching task as we cannot check params dynamically in DAGs declaration. And set
trigger_ruletonone_failedornone_failed_min_one_successfor taskt3, otherwiset3will also be skipped as it only be triggered when all upstream tasks insuccessstate by default. ForNONE_FAILED, the task will be triggered if all upstream tasks not infailedstate (i.e.successorskipped).from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( params={ 'skipTask1': 'false', }, ) as dag: t1 = DummyOperator(task_id='t1') t2 = DummyOperator(task_id='t2') t3 = DummyOperator(task_id='t3', trigger_rule=TriggerRule.NONE_FAILED) def branch_func(**kwargs): if kwargs['params']['skipTask1'] == 'false': return ['t1', 't2', 't3'] else: return ['t2', 't3'] branching = BranchPythonOperator( task_id='branching', python_callable=branch_func, dag=dag) branching >> [t1, t2] >> t3 branching >> t2 >> t3
Refer to https://airflow.apache.org/docs/apache-airflow/2.9.1/core-concepts/dags.html

Comments