How to make a self-retrigger DAG in Airflow
Background
I have a DAG which may run several hours to finish. The DAG is used to run a Spark job and generate some data. Now I want to do some data backfill - run the DAG about 500 times.
Of course, I can write a script to do so. But I will need to deploy that script to a place which can continuously run it without failure - otherwise I need to check it time by time and restart the script when unexpected error happens. As putting the script to a production env is not easy, I need to use a proxy to trigger the production Airflow. The Proxy token expired every 12 hours, which means I need to refresh it manually (use my Yubikey) - twice a day!
Solution
I’ve used TriggerDagRunOperator
to trigger other DAGs in Airflow. So why not use this operator to trigger the DAG itself when last run finished?
This is the final DAG:
with DAG(
'self_reterigger_backfill',
default_args={
'depends_on_past': False,
'email': EMAIL_LIST,
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'execution_timeout': timedelta(hours=12),
},
params={
'date': (datetime.now() - timedelta(days=2)).strftime('%Y-%m-%d'),
'end_date': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
},
dagrun_timeout=timedelta(hours=12),
description='Backfill job',
schedule_interval=None,
start_date=datetime(2023, 12, 10),
catchup=False,
# Make sure only one backfill task is running
max_active_runs=1,
tags=['recstrk'],
) as dag:
trigger_gmb_attribution = TriggerDagRunOperator(
task_id='trigger_gmb_attribution',
trigger_dag_id='gmb_attribution',
conf={
'date': "",
'isProduction': 'true',
},
wait_for_completion=True,
trigger_rule=TriggerRule.NONE_FAILED,
)
trigger_backfill = TriggerDagRunOperator(
task_id='trigger_backfill',
trigger_dag_id='self_reterigger_backfill',
conf={
'date': "",
'end_date': "",
},
wait_for_completion=False,
trigger_rule=TriggerRule.ALL_DONE,
trigger_run_id="self_reterigger_backfill_",
# Use different execution date to bypass exist DAG run check
execution_date="",
)
stop_task = DummyOperator(
task_id='stop_task',
dag=dag,
)
def should_continue(**kwargs):
# Define the target end date
end_date = datetime.strptime(kwargs['params']['end_date'], '%Y-%m-%d')
# Get the current params.date
current_date_str = kwargs['params']['date']
current_date = datetime.strptime(current_date_str, '%Y-%m-%d')
# Compare dates
if current_date >= end_date:
return 'stop_task'
else:
return 'trigger_backfill'
check_date_task = BranchPythonOperator(
task_id='check_date_task',
provide_context=True,
python_callable=should_continue,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
trigger_gmb_attribution >> check_date_task >> [trigger_backfill, stop_task]
The DAG will add the date
parameter by 1 day after each run, and stops until reaching the end_date
.
The 2nd TriggerDagRunOperator
is used to trigger the DAG itself:
- In
conf
, thedate
parameter is added for next run wait_for_completion
is set toFalse
to make the current DAG finished before next runtrigger_rule
is set toall_done
- means no matter before tasks are success, failed or skipped, the next run should always be triggeredtrigger_run_id
is set to a new value, otherwise the Airflow will raiseDagRunAlreadyExists
exceptionexecution_date
is also set to a new value to avoid theDagRunAlreadyExists
exception (checked the Airflow source code, it uses run id or execute date for the SQL query)
It’s also very easy to cancel/stop the backfill task. You can just mark the current running task as success/failed state when the last sub task is running. In this way, the sub task will still run to finish as we only mark the outside task as sucess/failed.
Enhancement - Run on given dates
Sometimes, we may need to backfill some specific dates (e.g. the job is not stable, and failed in some dates) instead of a date range. To do so, let’s add an extra parameter select_dates
and make the backfill DAG run on given dates when this parameter is provided (and ignore the end_date
parameter).
Here’re the changed parts:
trigger_backfill = TriggerDagRunOperator(
task_id='trigger_backfill',
trigger_dag_id='self_reterigger_backfill',
conf={
'date': "",
'end_date': "",
'select_dates': "",
'pageIds': "",
'skipIdlParquet': "",
'skipIfExist': "",
},
wait_for_completion=False,
trigger_rule=TriggerRule.ALL_DONE,
trigger_run_id="self_reterigger_backfill__",
# Use different execution date to bypass exist DAG run check
execution_date="",
)
def should_continue(**kwargs):
# Define the target end date
end_date = datetime.strptime(kwargs['params']['end_date'], '%Y-%m-%d')
# Get the current params.date
current_date_str = kwargs['params']['date']
current_date = datetime.strptime(current_date_str, '%Y-%m-%d')
select_dates = kwargs['params']['select_dates'].split(',')
# If select_dates is not empty, we only run the selected dates
if select_dates:
index = 0
for s in select_dates:
index += 1
if s == current_date_str:
break
next_date = select_dates[index] if index < len(select_dates) else ''
kwargs['ti'].xcom_push(key='next_date', value=next_date)
return 'trigger_backfill' if next_date else 'stop_task'
else:
next_date = (current_date + timedelta(days=1)).strftime('%Y-%m-%d')
kwargs['ti'].xcom_push(key='next_date', value=next_date)
# Compare dates
if current_date >= end_date:
return 'stop_task'
else:
return 'trigger_backfill'
In should_continue
, I checked the selected_dates
and if it is provided, I tried to find the position of current date
in it. And then specify the next running date from the selected_date
. If it is the last one in the selected_dates
, the stop_task
will be triggered instead. Since there’s easy way to use macros function to handle the logic. I put it in the Python code and pass the calculated next date by Airflow xcom state.
Comments