返回

FlinkCDC采mysql数据时间偏8小时修复方法分享

后端

FlinkCDC与MySQL时间戳时差调整

简介

在使用FlinkCDC从MySQL读取数据时,经常遇到日期类型与数据库日期时间不一致的情况,这是由于MySQL默认使用UTC(协调世界时),而我们实际使用的时区是GMT+8(北京时间)所致。本文将介绍两种方法将MySQL时间戳转换为GMT+8,以解决这一问题。

原因分析

MySQL的时间戳是使用UTC,而我们实际使用的时区是GMT+8,两者之间存在8小时时差。当FlinkCDC读取MySQL数据时,如果不进行时区转换,则会造成时间戳与实际时间不符。

解决方案

1. 使用FlinkCDC内置转换函数

FlinkCDC提供了DateTimeUtils.shiftTimeZone函数,可以将时间戳转换为指定的时区。

DateTimeUtils.shiftTimeZone(TIMESTAMP_TIMESTAMP, TIMESTAMP_LTZ)

其中,TIMESTAMP_TIMESTAMP表示需要转换的时间戳,TIMESTAMP_LTZ表示要转换到的时区。

例如,将时间戳1694597238000转换为GMT+8,可以使用以下代码:

DateTimeUtils.shiftTimeZone(1694597238000, "+08:00")

2. 使用Java代码转换

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;

public class TimestampConverter {

  public static void main(String[] args) {
    // 需要转换的时间戳
    long timestamp = 1694597238000L;

    // 要转换到的时区
    String timeZone = "+08:00";

    // 创建SimpleDateFormat对象,并设置时区
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    sdf.setTimeZone(TimeZone.getTimeZone(timeZone));

    // 将时间戳转换为Date对象
    Date date = new Date(timestamp);

    // 将Date对象转换为字符串
    String formattedDate = sdf.format(date);

    // 打印转换后的时间
    System.out.println(formattedDate);
  }
}

代码示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkCDCWithTimestampConversion {

  public static void main(String[] args) {
    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

    // 定义MySQL表
    String createTableDDL = "CREATE TABLE mysql_table (\n" +
        "  id INT PRIMARY KEY,\n" +
        "  name STRING,\n" +
        "  create_time TIMESTAMP,\n" +
        "  update_time TIMESTAMP\n" +
        ") WITH (\n" +
        "  'connector' = 'mysql-cdc',\n" +
        "  'hostname' = 'localhost',\n" +
        "  'port' = '3306',\n" +
        "  'database-name' = 'flink_cdc',\n" +
        "  'table-name' = 'mysql_table',\n" +
        "  'debezium.snapshot.mode' = 'initial'\n" +
        ")";
    tEnv.executeSql(createTableDDL);

    // 注册转换函数
    tEnv.createTemporarySystemFunction("shiftTimeZone", DateTimeUtils.shiftTimeZone);

    // 使用转换函数查询数据
    String querySQL = "SELECT id, name, shiftTimeZone(create_time, '+08:00'), shiftTimeZone(update_time, '+08:00') FROM mysql_table";
    Table resultTable = tEnv.sqlQuery(querySQL);

    // 输出结果
    resultTable.execute().print();
  }
}

常见问题解答

1. 为什么MySQL默认使用UTC时区?

UTC是一个国际标准时区,不受地理位置影响,可以保证时间戳在全球范围内的一致性。

2. 如何在MySQL中设置时区?

可以通过SET TIMEZONE命令设置MySQL的时区,例如:

SET TIMEZONE = '+08:00';

3. FlinkCDC支持哪些时区转换格式?

FlinkCDC支持所有java.util.TimeZone支持的时区格式,例如"GMT+8"、"Asia/Shanghai"。

4. 为什么使用Java代码转换时间戳?

使用Java代码转换时间戳可以更加灵活地控制时区转换过程,例如可以自定义格式化字符串。

5. 如何在生产环境中使用FlinkCDC进行时区转换?

在生产环境中,可以通过使用自定义UDF(用户定义函数)或Table API的Proctime/Rowtime处理功能进行时区转换。