返回

Airflow联动: 上游不定时触发DAG, 下游如何响应?

python

上游 DAG 不定时触发,如何联动下游 DAG?

有时候, 我们需要让一个 DAG (假设是 B1) 在另一个 DAG (假设是 A1) 运行完成后开始跑, 并且 A1 是不定时触发的(schedule=None)。 ExternalTaskSensor 虽好,但它更适合处理上游 DAG 有固定调度计划的情况。 咱咋办? 这篇博客就来聊聊这个问题。

问题根源:无固定调度,感知难

由于上游 DAG A1 没有固定的调度计划, 它是通过 TriggerDagRunOperator 被其他 DAG 触发的, 这意味着我们无法依赖常规的时间调度方式来同步 B1 的执行。 常规的传感器通常假设目标任务或 DAG 有可预测的执行时间, 但对于不定期触发的任务来说, 这套方法就不灵了。

解决之道

有好几种方法来解决这个问题。 我们可以根据具体情况,选择合适的方法:

1. 利用 DagStateTrigger (不太推荐,理解"逻辑日期"较复杂)

DagStateTrigger 可以异步等待另一个 DAG 在特定"逻辑日期"完成。 原问题里问到的“逻辑日期”(logical date),其实就是 Airflow 里 DAG 实例执行的日期, 不一定是实际的物理时间。 如果能确定A1被Trigger时候, 传递进去的日期是稳定的,那么B1可以使用这个方法进行触发。但通常A1通过 TriggerDAGrunOperator 触发时候, conf 参数设置较复杂时, 推断和控制这个logical_date比较麻烦. 所以除非能保证准确控制逻辑日期,否则不推荐使用此方法。

如果非要使用, 示例如下:

from airflow.sensors.external_task import DagStateTrigger

wait_for_a1 = DagStateTrigger(
    dag_id='B1',
    trigger_dag_id='A1',  # 上游 DAG 的 ID
    trigger_run_id="{{ dag_run.conf.get('upstream_run_id')}}", #确保获得上游run ID. 如果无法确认, 这一部分需要改动。
    execution_date="{{ dag_run.conf.get('upstream_execution_date')}}",#确保获得上游execution_date. 如果无法确认, 这一部分需要改动。
    poke_interval=60
    )

原理 : 这个 DagStateTrigger 会持续检查 A1 的状态。它根据trigger_dag_id 确认DAG的id, 根据trigger_run_id or execution_date确认DAG run的实例。 直到发现A1实例成功后, 才会释放下游任务。

限制 :
需要能够正确获得 A1 DAG run 的run_id 或者 execution_date, 才能正确等待, 需要提前进行参数的确认和设计. 否则很容易失败.

2. 自定义传感器 (推荐)

可以写一个自定义的传感器, 来轮询检查上游 DAG 的状态。 这种方法最灵活。

from airflow.sensors.base import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.models import DagRun
from sqlalchemy import func

class UnscheduledDagRunSensor(BaseSensorOperator):
    """
    自定义传感器, 检查不定期 DAG 的运行状态.
    """
    template_fields = ('upstream_dag_id',) # 使用模版

    def __init__(self, upstream_dag_id: str, poke_interval: int = 60, **kwargs):
        super().__init__(**kwargs)
        self.upstream_dag_id = upstream_dag_id
        self.poke_interval = poke_interval

    @provide_session
    def poke(self, context, session=None):
        count = (
            session.query(func.count(DagRun.run_id))
            .filter(DagRun.dag_id == self.upstream_dag_id, DagRun.state == 'success')
           .scalar()
        )

        # 检查是否已经存在更新的DAG 实例。
        last_check_count = context['ti'].xcom_pull(task_ids=context['task_instance_key_str'], key='last_dag_run_count')

        if last_check_count is None:
          context['ti'].xcom_push(key='last_dag_run_count', value=count)
          self.log.info(f"首次检查上游 DAG: {self.upstream_dag_id} 的完成数量为:{count}")
          return False  # 首次运行,必然返回False

        self.log.info(f"上游 DAG: {self.upstream_dag_id} 当前完成次数是: {count},上次检查是:{last_check_count}。")

        if count > last_check_count :
          context['ti'].xcom_push(key='last_dag_run_count', value=count) #更新
          self.log.info("检测到上游 DAG 有新的成功实例,触发!")
          return True # 有新实例后返回True。

        return False

# 使用示例
wait_for_a1 = UnscheduledDagRunSensor(
    task_id='wait_for_a1',
    upstream_dag_id='A1',
    poke_interval=60,  # 每 60 秒检查一次
)

原理 : 这个自定义传感器通过查询数据库里 dag_run 表,获取 dag_id 为 A1 且状态为 success 的记录数量。然后, 将数量进行xcom 推送和获取, 对比两次检查, 如果发现次数增加了,则认为 A1 有了新的、成功的实例。 通过 XCom 来存储上一次的检查结果,避免了重复触发。

安全建议 : 轮询间隔时间(poke_interval) 别设置得太短, 避免给数据库造成过大压力。根据实际情况做调整,一分钟或者几分钟查一次一般就够了。

3. 使用数据库或消息队列 (进阶,适用于复杂场景)

如果 Airflow 环境允许, 可以考虑使用外部数据库或消息队列。

实现思路 :

  1. 在 A1 的最后一个任务里,往数据库(比如 PostgreSQL、MySQL)的某个表里插入一条记录, 或者往消息队列(如 RabbitMQ、Kafka)里发送一条消息。记录信息或者消息内容可以是 A1 的 dag_idrun_id 和状态等信息。

  2. 在 B1 里,使用一个传感器去读取数据库里的这条记录, 或者订阅消息队列里的消息。只要读取到新记录/收到新消息, 就表明 A1 运行完成。

    • 数据库 : 用 SqlSensor 定时查询数据库表。

      from airflow.providers.postgres.sensors.sql import SqlSensor #例如使用PostgreSQL
      
      wait_for_a1 = SqlSensor(
          task_id='wait_for_a1_completion',
          conn_id='my_postgres_connection',  # Airflow 里配置好的数据库连接
          sql="""
              SELECT 1
              FROM dag_completion_table
              WHERE dag_id = 'A1'
              AND run_id > '{{ ti.xcom_pull(key="last_processed_run_id") or "" }}'  -- 防止重复处理, 初始值可设为空字符串
              ORDER BY run_id DESC
              LIMIT 1;
          """,
         # success=check_new_run_id_function  这里可以自定义function来决定sql 运行的返回值, 什么样是正确的. 本示例假设run_id递增。
          failure=None, # 可选,定义失败的处理逻辑.
          timeout=600, # 超时
          poke_interval=60,
         )
      
      

    需要注意的是, 在A1 DAG中增加一个任务用于在数据表dag_completion_table中增加记录, run_id字段尽量保证递增,这样可以避免重复处理, 这个表根据实际情况可以增减字段。

    • 消息队列 : 自定义传感器来订阅和处理消息 (以 RabbitMQ 为例, 但更推荐用现成的 MQ 插件,如airflow-provider-rabbitmq)。

    由于不同类型MQ 的sensor 设计不一样, 代码比较复杂,这里就不列出完整实现, 只给出基本思路。

    1. 建立连接: 使用 pika (RabbitMQ 的 Python 客户端) 建立到 RabbitMQ 的连接。

    2. 声明队列: 声明一个用于接收 A1 完成消息的队列。

    3. 消费消息: 从队列里获取消息,并解析消息内容(如 DAG ID 和 run ID)。

    4. 确认消息: 确认已经处理了消息,防止消息被重复消费。

原理 : 使用数据库/消息队列作为中介, 将 A1 完成的信息传递给 B1。 这种方式更加解耦, 也更加可靠。

安全建议 :

  • 如果是使用数据库,确保数据库连接信息的安全, 不要硬编码在代码里, 通过Airflow 的 connection 进行配置。
  • 设置好数据库/消息队列的访问权限,防止未经授权的访问。
  • 设置合理的消息确认机制(ack), 确保消息被正确处理, 不会丢失。

优点 :

  • 更适合多个下游DAG需要依赖同一个上游DAG,只需要发布一次完成消息,多个下游都可以消费.
  • 可以持久化A1 完成状态,方便排查问题

4. Airflow API 调用 (进阶,适合非常规控制)

可以通过 Airflow 提供的 API 来轮询查询上游 DAG 的状态。 这需要对 Airflow 的 API 有一定了解。

from airflow.api.client import get_current_api_client
import time

class AirflowAPISensor(BaseSensorOperator):

  def __init__(self, upstream_dag_id: str, poke_interval:int=60, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.upstream_dag_id = upstream_dag_id
    self.poke_interval = poke_interval

  def poke(self,context):
      client = get_current_api_client()
      dag_runs = client.get_dag_runs(self.upstream_dag_id,execution_date_gte= context['data_interval_start'])
      #也可以考虑更复杂的时间设置,比如增加执行时间等。
       # 通过dag_run ID进行排序和判断也可以加入。
      if dag_runs : # 理论上只会返回一个. 也可以更精确进行判断和控制
        for dr in dag_runs:
            if dr.state == 'success':
              self.log.info(f"检测到DAG run 实例: {dr.dag_run_id} 成功完成!")
              return True

      time.sleep(self.poke_interval)
      return False

原理 :
通过Airflow 的客户端 get_current_api_client 来进行api 调用和控制,检查DAG 实例情况,判断是否需要进行触发. 需要特别注意, 需要提前开启实验性的API或者稳定的Rest API(根据不同Airflow版本). 而且在api_auth 中做好权限的配置.
对于不同的 Airflow 版本, 可能有不同的 API endpoint, 注意根据实际版本查阅相关文档。

安全性建议 :
务必对访问 API 的客户端进行身份验证和授权。 使用 API key 或 service account 等机制控制访问权限.

总结

针对不定时触发的上游 DAG, 我们可以采用多种策略实现下游 DAG 的联动。 自定义传感器灵活性较好,也较为容易实现。 而使用数据库、消息队列, 或者 Airflow API, 则适合更复杂的场景, 或需要更高可靠性的情况。 请务必根据实际情况, 选择适合自己需求的方案。