Airflow联动: 上游不定时触发DAG, 下游如何响应?
2025-03-16 02:46:46
上游 DAG 不定时触发,如何联动下游 DAG?
有时候, 我们需要让一个 DAG (假设是 B1) 在另一个 DAG (假设是 A1) 运行完成后开始跑, 并且 A1 是不定时触发的(schedule=None
)。 ExternalTaskSensor
虽好,但它更适合处理上游 DAG 有固定调度计划的情况。 咱咋办? 这篇博客就来聊聊这个问题。
问题根源:无固定调度,感知难
由于上游 DAG A1 没有固定的调度计划, 它是通过 TriggerDagRunOperator
被其他 DAG 触发的, 这意味着我们无法依赖常规的时间调度方式来同步 B1 的执行。 常规的传感器通常假设目标任务或 DAG 有可预测的执行时间, 但对于不定期触发的任务来说, 这套方法就不灵了。
解决之道
有好几种方法来解决这个问题。 我们可以根据具体情况,选择合适的方法:
1. 利用 DagStateTrigger
(不太推荐,理解"逻辑日期"较复杂)
DagStateTrigger
可以异步等待另一个 DAG 在特定"逻辑日期"完成。 原问题里问到的“逻辑日期”(logical date),其实就是 Airflow 里 DAG 实例执行的日期, 不一定是实际的物理时间。 如果能确定A1被Trigger时候, 传递进去的日期是稳定的,那么B1可以使用这个方法进行触发。但通常A1通过 TriggerDAGrunOperator 触发时候, conf 参数设置较复杂时, 推断和控制这个logical_date
比较麻烦. 所以除非能保证准确控制逻辑日期,否则不推荐使用此方法。
如果非要使用, 示例如下:
from airflow.sensors.external_task import DagStateTrigger
wait_for_a1 = DagStateTrigger(
dag_id='B1',
trigger_dag_id='A1', # 上游 DAG 的 ID
trigger_run_id="{{ dag_run.conf.get('upstream_run_id')}}", #确保获得上游run ID. 如果无法确认, 这一部分需要改动。
execution_date="{{ dag_run.conf.get('upstream_execution_date')}}",#确保获得上游execution_date. 如果无法确认, 这一部分需要改动。
poke_interval=60
)
原理 : 这个 DagStateTrigger
会持续检查 A1 的状态。它根据trigger_dag_id
确认DAG的id, 根据trigger_run_id
or execution_date
确认DAG run的实例。 直到发现A1实例成功后, 才会释放下游任务。
限制 :
需要能够正确获得 A1 DAG run 的run_id
或者 execution_date
, 才能正确等待, 需要提前进行参数的确认和设计. 否则很容易失败.
2. 自定义传感器 (推荐)
可以写一个自定义的传感器, 来轮询检查上游 DAG 的状态。 这种方法最灵活。
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.models import DagRun
from sqlalchemy import func
class UnscheduledDagRunSensor(BaseSensorOperator):
"""
自定义传感器, 检查不定期 DAG 的运行状态.
"""
template_fields = ('upstream_dag_id',) # 使用模版
def __init__(self, upstream_dag_id: str, poke_interval: int = 60, **kwargs):
super().__init__(**kwargs)
self.upstream_dag_id = upstream_dag_id
self.poke_interval = poke_interval
@provide_session
def poke(self, context, session=None):
count = (
session.query(func.count(DagRun.run_id))
.filter(DagRun.dag_id == self.upstream_dag_id, DagRun.state == 'success')
.scalar()
)
# 检查是否已经存在更新的DAG 实例。
last_check_count = context['ti'].xcom_pull(task_ids=context['task_instance_key_str'], key='last_dag_run_count')
if last_check_count is None:
context['ti'].xcom_push(key='last_dag_run_count', value=count)
self.log.info(f"首次检查上游 DAG: {self.upstream_dag_id} 的完成数量为:{count}")
return False # 首次运行,必然返回False
self.log.info(f"上游 DAG: {self.upstream_dag_id} 当前完成次数是: {count},上次检查是:{last_check_count}。")
if count > last_check_count :
context['ti'].xcom_push(key='last_dag_run_count', value=count) #更新
self.log.info("检测到上游 DAG 有新的成功实例,触发!")
return True # 有新实例后返回True。
return False
# 使用示例
wait_for_a1 = UnscheduledDagRunSensor(
task_id='wait_for_a1',
upstream_dag_id='A1',
poke_interval=60, # 每 60 秒检查一次
)
原理 : 这个自定义传感器通过查询数据库里 dag_run
表,获取 dag_id
为 A1 且状态为 success
的记录数量。然后, 将数量进行xcom 推送和获取, 对比两次检查, 如果发现次数增加了,则认为 A1 有了新的、成功的实例。 通过 XCom 来存储上一次的检查结果,避免了重复触发。
安全建议 : 轮询间隔时间(poke_interval
) 别设置得太短, 避免给数据库造成过大压力。根据实际情况做调整,一分钟或者几分钟查一次一般就够了。
3. 使用数据库或消息队列 (进阶,适用于复杂场景)
如果 Airflow 环境允许, 可以考虑使用外部数据库或消息队列。
实现思路 :
-
在 A1 的最后一个任务里,往数据库(比如 PostgreSQL、MySQL)的某个表里插入一条记录, 或者往消息队列(如 RabbitMQ、Kafka)里发送一条消息。记录信息或者消息内容可以是 A1 的
dag_id
、run_id
和状态等信息。 -
在 B1 里,使用一个传感器去读取数据库里的这条记录, 或者订阅消息队列里的消息。只要读取到新记录/收到新消息, 就表明 A1 运行完成。
-
数据库 : 用
SqlSensor
定时查询数据库表。from airflow.providers.postgres.sensors.sql import SqlSensor #例如使用PostgreSQL wait_for_a1 = SqlSensor( task_id='wait_for_a1_completion', conn_id='my_postgres_connection', # Airflow 里配置好的数据库连接 sql=""" SELECT 1 FROM dag_completion_table WHERE dag_id = 'A1' AND run_id > '{{ ti.xcom_pull(key="last_processed_run_id") or "" }}' -- 防止重复处理, 初始值可设为空字符串 ORDER BY run_id DESC LIMIT 1; """, # success=check_new_run_id_function 这里可以自定义function来决定sql 运行的返回值, 什么样是正确的. 本示例假设run_id递增。 failure=None, # 可选,定义失败的处理逻辑. timeout=600, # 超时 poke_interval=60, )
需要注意的是, 在A1 DAG中增加一个任务用于在数据表dag_completion_table中增加记录, run_id字段尽量保证递增,这样可以避免重复处理, 这个表根据实际情况可以增减字段。
- 消息队列 : 自定义传感器来订阅和处理消息 (以 RabbitMQ 为例, 但更推荐用现成的 MQ 插件,如
airflow-provider-rabbitmq
)。
由于不同类型MQ 的sensor 设计不一样, 代码比较复杂,这里就不列出完整实现, 只给出基本思路。
-
建立连接: 使用 pika (RabbitMQ 的 Python 客户端) 建立到 RabbitMQ 的连接。
-
声明队列: 声明一个用于接收 A1 完成消息的队列。
-
消费消息: 从队列里获取消息,并解析消息内容(如 DAG ID 和 run ID)。
-
确认消息: 确认已经处理了消息,防止消息被重复消费。
-
原理 : 使用数据库/消息队列作为中介, 将 A1 完成的信息传递给 B1。 这种方式更加解耦, 也更加可靠。
安全建议 :
- 如果是使用数据库,确保数据库连接信息的安全, 不要硬编码在代码里, 通过Airflow 的 connection 进行配置。
- 设置好数据库/消息队列的访问权限,防止未经授权的访问。
- 设置合理的消息确认机制(ack), 确保消息被正确处理, 不会丢失。
优点 :
- 更适合多个下游DAG需要依赖同一个上游DAG,只需要发布一次完成消息,多个下游都可以消费.
- 可以持久化A1 完成状态,方便排查问题
4. Airflow API 调用 (进阶,适合非常规控制)
可以通过 Airflow 提供的 API 来轮询查询上游 DAG 的状态。 这需要对 Airflow 的 API 有一定了解。
from airflow.api.client import get_current_api_client
import time
class AirflowAPISensor(BaseSensorOperator):
def __init__(self, upstream_dag_id: str, poke_interval:int=60, *args, **kwargs):
super().__init__(*args, **kwargs)
self.upstream_dag_id = upstream_dag_id
self.poke_interval = poke_interval
def poke(self,context):
client = get_current_api_client()
dag_runs = client.get_dag_runs(self.upstream_dag_id,execution_date_gte= context['data_interval_start'])
#也可以考虑更复杂的时间设置,比如增加执行时间等。
# 通过dag_run ID进行排序和判断也可以加入。
if dag_runs : # 理论上只会返回一个. 也可以更精确进行判断和控制
for dr in dag_runs:
if dr.state == 'success':
self.log.info(f"检测到DAG run 实例: {dr.dag_run_id} 成功完成!")
return True
time.sleep(self.poke_interval)
return False
原理 :
通过Airflow 的客户端 get_current_api_client
来进行api 调用和控制,检查DAG 实例情况,判断是否需要进行触发. 需要特别注意, 需要提前开启实验性的API或者稳定的Rest API(根据不同Airflow版本). 而且在api_auth 中做好权限的配置.
对于不同的 Airflow 版本, 可能有不同的 API endpoint, 注意根据实际版本查阅相关文档。
安全性建议 :
务必对访问 API 的客户端进行身份验证和授权。 使用 API key 或 service account 等机制控制访问权限.
总结
针对不定时触发的上游 DAG, 我们可以采用多种策略实现下游 DAG 的联动。 自定义传感器灵活性较好,也较为容易实现。 而使用数据库、消息队列, 或者 Airflow API, 则适合更复杂的场景, 或需要更高可靠性的情况。 请务必根据实际情况, 选择适合自己需求的方案。