CuriousY A world with wonder

How to make a self-retrigger DAG in Airflow

| Comment

Background

I have a DAG which may run several hours to finish. The DAG is used to run a Spark job and generate some data. Now I want to do some data backfill - run the DAG about 500 times.
Of course, I can write a script to do so. But I will need to deploy that script to a place which can continuously run it without failure - otherwise I need to check it time by time and restart the script when unexpected error happens. As putting the script to a production env is not easy, I need to use a proxy to trigger the production Airflow. The Proxy token expired every 12 hours, which means I need to refresh it manually (use my Yubikey) - twice a day!

Solution

I’ve used TriggerDagRunOperator to trigger other DAGs in Airflow. So why not use this operator to trigger the DAG itself when last run finished?

This is the final DAG:

with DAG(
        'self_reterigger_backfill',
        default_args={
            'depends_on_past': False,
            'email': EMAIL_LIST,
            'email_on_failure': True,
            'email_on_retry': False,
            'retries': 0,
            'execution_timeout': timedelta(hours=12),
        },
        params={
            'date': (datetime.now() - timedelta(days=2)).strftime('%Y-%m-%d'),
            'end_date': (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
        },
        dagrun_timeout=timedelta(hours=12),
        description='Backfill job',
        schedule_interval=None,
        start_date=datetime(2023, 12, 10),
        catchup=False,
        # Make sure only one backfill task is running
        max_active_runs=1,
        tags=['recstrk'],
) as dag:
    trigger_gmb_attribution = TriggerDagRunOperator(
        task_id='trigger_gmb_attribution',
        trigger_dag_id='gmb_attribution',
        conf={
            'date': "",
            'isProduction': 'true',
        },
        wait_for_completion=True,
        trigger_rule=TriggerRule.NONE_FAILED,
    )

    trigger_backfill = TriggerDagRunOperator(
        task_id='trigger_backfill',
        trigger_dag_id='self_reterigger_backfill',
        conf={
            'date': "",
            'end_date': "",
        },
        wait_for_completion=False,
        trigger_rule=TriggerRule.ALL_DONE,
        trigger_run_id="self_reterigger_backfill_",
        # Use different execution date to bypass exist DAG run check
        execution_date="",
    )

    stop_task = DummyOperator(
        task_id='stop_task',
        dag=dag,
    )


    def should_continue(**kwargs):
        # Define the target end date
        end_date = datetime.strptime(kwargs['params']['end_date'], '%Y-%m-%d')

        # Get the current params.date
        current_date_str = kwargs['params']['date']
        current_date = datetime.strptime(current_date_str, '%Y-%m-%d')

        # Compare dates
        if current_date >= end_date:
            return 'stop_task'
        else:
            return 'trigger_backfill'


    check_date_task = BranchPythonOperator(
        task_id='check_date_task',
        provide_context=True,
        python_callable=should_continue,
        dag=dag,
        trigger_rule=TriggerRule.ALL_DONE,
    )

    trigger_gmb_attribution >> check_date_task >> [trigger_backfill, stop_task]

The DAG will add the date parameter by 1 day after each run, and stops until reaching the end_date.

The 2nd TriggerDagRunOperator is used to trigger the DAG itself:

  • In conf, the date parameter is added for next run
  • wait_for_completion is set to False to make the current DAG finished before next run
  • trigger_rule is set to all_done - means no matter before tasks are success, failed or skipped, the next run should always be triggered
  • trigger_run_id is set to a new value, otherwise the Airflow will raise DagRunAlreadyExists exception
  • execution_date is also set to a new value to avoid the DagRunAlreadyExists exception (checked the Airflow source code, it uses run id or execute date for the SQL query)

It’s also very easy to cancel/stop the backfill task. You can just mark the current running task as success/failed state when the last sub task is running. In this way, the sub task will still run to finish as we only mark the outside task as sucess/failed.

Read more

Handle Option type in Scala

| Comment

match

The basic way to handle Option type is using match:

val nameMaybe: Option[String] = Some("Yu")
val newName: Option[String] = nameMaybe match {
  case Some(name) =>
    Some(name.trim.toUppercase)
  case None =>
    None
}

map, flatMap, filter

If we use map, flatMap, filter for an Option type, it transforms the content of the Option if it is Some(value), leaving it unchanged if it is None. The result of these functions is still an Option type.

The above example could simply be like this:

val nameMaybe: Option[String] = Some("Yu")
val newName: Option[String] = nameMaybe.map(_.trim.toUppercase)

Another complex example:

val maybeListOfStrings: Option[List[String]] = Some(List("Hello", "World", "!"))

val joinedStringOption: Option[String] = maybeListOfStrings.map(_.mkString(" "))

Here, the _.mkString(" ") is applied to the List in the Option type. The map  is not used to iterate the List, but process the value in the Option type.

To iterate the elements, we need to use another map function in the first map:

val processedListOfStrings: Option[List[String]] = maybeListOfStrings.map(_.map(_.toUpperCase))

How to trigger tasks conditionally in Airflow

| Comment

I have 3 tasks, and I want to trigger [t1, t2] >> t3 in condition A, and t2 >> t3 for condition B, i.e. skip t1 conditionally.

Solution

  1. Add params in DAG:

    with DAG(
        params={
            'skipTask1': 'false',
        },
    ) as dag:
    
  2. Add a branching task as we cannot check params dynamically in DAGs declaration. And set trigger_rule to none_failed or none_failed_min_one_success for task t3, otherwise t3 will also be skipped as it only be triggered when all upstream tasks in success state by default. For NONE_FAILED, the task will be triggered if all upstream tasks not in failed state (i.e. success or skipped).

    from airflow.operators.python import BranchPythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.trigger_rule import TriggerRule
       
    with DAG(
        params={
            'skipTask1': 'false',
        },
    ) as dag:
       
        t1 = DummyOperator(task_id='t1')
        t2 = DummyOperator(task_id='t2')
        t3 = DummyOperator(task_id='t3', trigger_rule=TriggerRule.NONE_FAILED)
       
        def branch_func(**kwargs):
            if kwargs['params']['skipTask1'] == 'false':
                return ['t1', 't2', 't3']
            else:
                return ['t2', 't3']
       
        branching = BranchPythonOperator(
        task_id='branching',
        python_callable=branch_func,
        dag=dag)
        branching >> [t1, t2] >> t3
        branching >> t2 >> t3
    

Refer to https://airflow.apache.org/docs/apache-airflow/2.9.1/core-concepts/dags.html

Spark/Hive实践

| Comment

How to Debug in local

Prerequisites

Download Spark 3.1.1 (use the same version of your production env) with Hadoop 3.2 in https://archive.apache.org/dist/spark/spark-3.1.1/ to local.

Debug

  1. Go into sbt shell. For any code change, just type assembly, it will compile and assemble the jar package
  2. Since it’s hard to read production Hive table from local, we can use some intermediate result to test our Spark job logic. To do so, generate some data in the production env, save as parquet file and then download to local. In the Spark job, comment the reading data logic and read it from a local file instead like:

    val result = spark.read.parquet("""/your/local/path/test.parquet""")
    

    You can also save the results as csv files, read it like:

    val nameMappingTable: Dataset[NameMapping] = spark.read
      .option("header", "true")
      .schema(Encoders.product[NameMapping].schema)
      .csv("/you/local/path/test.csv").as[NameMapping]
    
  3. Submit job using local Spark binary:

    ./bin/spark-submit \
    --class your.job.EntryClass \
    /your/local/path/your-spark-job.jar \
    --params1 xxx \
    --params2 xxx
    
  4. Check the result:

    Note: to inspect parquet file, you can install parquet-cli, by:

    brew install parquet-cli
    

    Then:

    parquet meta /path/to/your/target.gz.parquet
    

How to load data from parquet file to Hive table

Solution 1

一种方式是用load命令:

LOAD DATA INPATH '/my/path/parquet' INTO TABLE my_table;

实测下来load在我们那会报错,所以最后用了insert:

INSERT OVERWRITE TABLE my_table PARTITION(DT='20240403') 
SELECT * FROM parquet.`viewfs://my/path/parquet`

其中,使用INSERT OVERWRITE会覆盖相同partition下的数据,而INSERT INTO则不会覆盖。后面的select语句需要保证column的顺序和table创建时的schema一致,缺少的字段可以手动添加为NULL,因为insert是通过位置来写入的。

Note:
Spark SQL里面的saveAsTable在overwrite模式下是会先drop table再根据data frame的schema重新创建table,所以不能用于这里的数据加载。

Solution 2

使用scala来insert,如果希望是按照partition来覆盖的话,需要在spark conf里面配置:

val sparkConf: SparkConf = new SparkConf().setAppName(AppName)
 .setAll(Seq(
   "spark.sql.sources.partitionOverwriteMode" -> "dynamic",
   "hive.exec.dynamic.partition.mode" -> "nonstrict",
 ))

然后再insert到指定的Hive table:

resultDf.write.mode(SaveMode.Overwrite).format("parquet")
  .insertInto(targetTable)
Read more

Escape % in pymysql

| Comment

想要用pymysql执行类似如下的sql查询,却碰到这样的error:

select * from rules where `sql` LIKE '%dad%'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 3, in exec_sql
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1003, in execute
    return self._execute_text(object_, multiparams, params)
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1172, in _execute_text
    ret = self._execute_context(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1514, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 146, in execute
    query = self.mogrify(query, args)
  File "/usr/local/lib/python3.8/site-packages/pymysql/cursors.py", line 125, in mogrify
    query = query % self._escape_args(args, conn)
TypeError: %d format: a number is required, not dict

原因是%在pymysql有特别的含义,可以通过增加一个%来escape:

select * from rules where `sql` LIKE '%%dad%%'
First | Page 1 of 25 |