返回

数据库变更数据捕获,PostgreSQL与SpringBoot Flink CDC集成实战

后端

实时捕获PostgreSQL变更数据:使用SpringBoot Flink CDC

问题:数据变更监控的复杂性

在当今数据驱动的世界中,及时准确地获取和处理数据变更至关重要。随着数据库系统规模和复杂性的不断增长,实时监控和处理这些变更变得越来越具有挑战性。

解决方案:变更数据捕获 (CDC)

变更数据捕获 (CDC) 技术为我们提供了实时获取数据库中数据变更的解决方案。通过订阅数据库变更日志,CDC 工具可以持续捕获和传输这些变更,使我们能够实时响应数据更新。

SpringBoot Flink CDC:PostgreSQL 的理想 CDC 工具

SpringBoot Flink CDC 是一个用于捕获 PostgreSQL 数据库变更数据的强大工具。它利用 Apache Flink 的 CDC 连接器,将变更数据发布到 Kafka 等消息队列系统,从而实现下游系统的实时处理。

优势:

  • 实时捕获: 即时捕获数据库变更,无延迟。
  • 可扩展性: 利用 Apache Flink 的分布式处理能力,可扩展到大规模数据。
  • 集成灵活: 支持将变更数据发布到各种消息系统,如 Kafka 和 Pulsar。
  • 易于使用: 使用 Spring Boot 简化了配置和部署,并提供了开箱即用的 CDC 任务模板。

如何使用 SpringBoot Flink CDC

1. 安装先决条件:

  • Java 11+
  • Maven 3.8+
  • PostgreSQL 10+
  • Apache Flink 1.15+
  • Spring Boot 2.7+

2. 创建 PostgreSQL 表:

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  email VARCHAR(255) UNIQUE NOT NULL,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

3. 安装 Flink CDC:

mvn install:install-file -Dfile=/path/to/flink-cdc-connectors-<version>.jar -DgroupId=org.apache.flink -DartifactId=flink-cdc-connectors -Dversion=<version> -Dpackaging=jar

4. 创建 SpringBoot 项目:

  • 添加以下依赖项:
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.15.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-sql-client</artifactId>
  <version>1.15.0</version>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>postgresql</artifactId>
  <version>42.5.0</version>
</dependency>
  • 配置数据库连接信息:
spring.datasource.url=jdbc:postgresql://localhost:5432/cdc_demo
spring.datasource.username=postgres
spring.datasource.password=my-password
  • 配置 Kafka 连接信息:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=cdc-consumer
spring.kafka.consumer.auto-offset-reset=earliest

5. 创建 Flink CDC 任务:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class CdcTask {

  public static void main(String[] args) throws Exception {
    // 创建流执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置表环境
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

    // 创建 Flink CDC 表
    tableEnv.executeSql(
        "CREATE TABLE users_cdc (\n" +
        "  id INT PRIMARY KEY,\n" +
        "  name STRING,\n" +
        "  email STRING,\n" +
        "  created_at TIMESTAMP,\n" +
        "  updated_at TIMESTAMP\n" +
        ") WITH (\n" +
        "  'connector' = 'flink-cdc',\n" +
        "  'hostname' = 'localhost',\n" +
        "  'port' = '5432',\n" +
        "  'username' = 'postgres',\n" +
        "  'password' = 'my-password',\n" +
        "  'database-name' = 'cdc_demo',\n" +
        "  'table-name' = 'users'\n" +
        ")");

    // 将 Flink CDC 表转换为 DataStream
    Table usersCdcTable = tableEnv.from("users_cdc");
    DataStream<String> usersCdcStream = tableEnv.toChangelogStream(usersCdcTable).map(row -> row.toString());

    // 将 DataStream 发送到 Kafka
    usersCdcStream.addSink(new FlinkKafkaProducer011<>(
        "users_cdc",
        new SimpleStringSchema(),
        PropertiesUtil.buildKafkaProps()));

    // 执行任务
    env.execute("Flink CDC Task");
  }
}

6. 运行 SpringBoot 项目:

mvn spring-boot:run

7. 验证数据捕获:

向 "users" 表中插入、更新和删除数据,然后在 Kafka 中查看 "users_cdc" 主题。您应该可以看到相应的变更数据被捕获并发送到 Kafka 中。

常见问题解答

1. 无法连接到 PostgreSQL 数据库?

  • 检查数据库连接信息是否正确。
  • 确保 PostgreSQL 数据库正在运行。

2. Flink CDC 任务无法启动?

  • 检查 Flink CDC 连接器是否已正确配置。
  • 确保 PostgreSQL 数据库的端口已打开。

3. Kafka 中没有变更数据?

  • 检查 Kafka 连接器是否已正确配置。
  • 确保 PostgreSQL 数据库的 CDC 功能已开启。

4. 如何配置 Kafka 主题分区?

  • 在 Flink CDC 配置中指定 "topic.partition" 属性。

5. 如何处理重复的数据?

  • 使用 Flink 的去重转换或使用自定义处理逻辑。

结论

SpringBoot Flink CDC 为我们提供了一种强大且高效的方法来实时捕获 PostgreSQL 数据库中的变更数据。通过利用 Apache Flink 和 Kafka 的强大功能,我们可以构建可靠且可扩展的数据处理管道,为实时应用程序和分析提供支持。