返回

在 Airflow PostgresOperator 中获取执行日期和逻辑日期:XCom vs 上下文变量

python

在 Airflow 中获取 PostgresOperator 中的执行日期和逻辑日期

简介

Airflow 是一个强大的工作流管理系统,它允许用户创建、计划和监控复杂的自动化任务。PostgresOperator 是一个 Airflow 运算符,它允许用户与 PostgreSQL 数据库交互。然而,它没有内置的方法来获取执行日期或逻辑日期。本文将探讨在 PostgresOperator 中获取这些日期的两种方法:使用 XCom 和使用上下文变量。

使用 XComs

XCom(跨任务通信)是一种机制,允许 Airflow 任务在不同任务之间传递数据。我们可以利用 XCom 将执行日期或逻辑日期从 PythonOperator 传递到 PostgresOperator。

步骤:

  1. 在 PythonOperator 中获取执行日期和逻辑日期。
  2. 将这些值存储在 XCom 中。
  3. 在 PostgresOperator 中,从 XCom 中检索这些值。

示例代码:

import pendulum

from airflow import models
from airflow.operators import python_operator
from airflow.operators import postgres_operator
from airflow.utils.dates import days_ago


# 获取执行日期和逻辑日期
def get_dates(**kwargs):
    execution_date = kwargs['execution_date']
    logical_date = kwargs['logical_date']
    return execution_date, logical_date


# 存储执行日期和逻辑日期
def store_dates(execution_date, logical_date, **kwargs):
    kwargs['ti'].xcom_push(key='execution_date', value=execution_date)
    kwargs['ti'].xcom_push(key='logical_date', value=logical_date)


# 使用 XCom 获取执行日期和逻辑日期
def use_dates(**kwargs):
    execution_date = kwargs['ti'].xcom_pull(task_ids='python_operator', key='execution_date')
    logical_date = kwargs['ti'].xcom_pull(task_ids='python_operator', key='logical_date')
    print(execution_date)
    print(logical_date)


# PythonOperator
with models.DAG(
        'example_dag',
        start_date=days_ago(1),
        schedule_interval=None) as dag:

    python_operator = python_operator.PythonOperator(
        task_id='python_operator',
        python_callable=get_dates)

    store_dates_operator = python_operator.PythonOperator(
        task_id='store_dates_operator',
        python_callable=store_dates)

    postgres_operator = postgres_operator.PostgresOperator(
        task_id='postgres_operator',
        postgres_conn_id='my_postgres',
        sql="""
            SELECT *
            FROM table
            WHERE date = {{ti.xcom_pull(task_ids='python_operator', key='logical_date')}}
        """)

    python_operator >> store_dates_operator >> postgres_operator

使用上下文变量

上下文变量是 Airflow 传递给每个任务的一组内置变量。其中包括 {{ ds }}{{ ts }} 变量,它们分别包含执行日期和逻辑日期。

步骤:

  1. 在 PostgresOperator 的 SQL 语句中使用 {{ ds }}{{ ts }} 变量。

示例代码:

task = PostgresOperator(
    task_id="task_id",
    postgres_conn_id="my_postgres",
    sql="""
        SELECT *
        FROM table
        WHERE date = '{{ ds }}'
    """)

最佳实践

选择哪种方法取决于你的具体用例。

  • 使用 XComs: 适用于需要在多个任务之间传递大量或复杂数据的情况。
  • 使用上下文变量: 适用于单个任务中访问执行日期或逻辑日期的情况。
  • 创建自定义运算符: 适用于需要更复杂自定义行为的情况。

结论

通过使用 XCom 或上下文变量,你可以获取 PostgresOperator 中的执行日期和逻辑日期,从而增强 Airflow 工作流的灵活性。选择合适的方法以满足你的具体需求,并优化工作流的性能和可维护性。

常见问题解答

  1. XCom 和上下文变量有什么区别?
    XCom 允许在任务之间传递数据,而上下文变量是由 Airflow 自动提供的内置变量。
  2. 什么时候应该使用 XCom,什么时候应该使用上下文变量?
    使用 XCom 传递大量或复杂数据,使用上下文变量访问单个任务中的执行日期或逻辑日期。
  3. 如何创建自定义运算符?
    你可以编写一个 Python 类,继承自 Airflow 的 BaseOperator,并定义自己的方法和属性。
  4. 如何获取执行日期和逻辑日期的格式化版本?
    你可以使用 airflow.utils.timezone.utcnow().isoformat() 获取 UTC 格式化执行日期,或使用 airflow.utils.dates.today().isoformat() 获取逻辑日期。
  5. 有哪些其他方法可以获取执行日期和逻辑日期?
    你可以使用 airflow.macros.datetime.now() 获取当前时间戳,或使用 airflow.macros.date.today() 获取今天的日期。