返回

FlinkSQL读写JDBC(MySQL):轻松搞定数据库交互

后端

FlinkSQL 读写 JDBC(MySQL):数据库交互的新高度

简介

在当今数据爆炸的时代,掌握多种数据处理工具是必不可少的技能。Apache Flink 的 SQL 接口 FlinkSQL 以其强大的流处理和批处理能力而备受青睐。而 Java Database Connectivity (JDBC) 是 Java 世界中与数据库交互的标准接口,支持 MySQL、Oracle 和 PostgreSQL 等多种数据库。本文将深入探讨 FlinkSQL 与 JDBC(MySQL) 的结合,为您展示如何解锁数据库交互的新姿势。

一、FlinkSQL 读取 JDBC(MySQL)

1. 添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>1.15.3</version>
</dependency>

2. 配置 JDBC 连接信息

Properties properties = new Properties();
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("url", "jdbc:mysql://localhost:3306/test");
properties.setProperty("username", "root");
properties.setProperty("password", "password");

3. 创建 JDBC 数据源

JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat()
  .setDrivername("com.mysql.jdbc.Driver")
  .setDBUrl("jdbc:mysql://localhost:3306/test")
  .setUsername("root")
  .setPassword("password")
  .setQuery("select * from user")
  .finish();

4. 读取数据

DataSet<Row> dataSet = executionEnvironment.createInput(inputFormat);

二、FlinkSQL 写入 JDBC(MySQL)

1. 添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>1.15.3</version>
</dependency>

2. 配置 JDBC 连接信息

Properties properties = new Properties();
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("url", "jdbc:mysql://localhost:3306/test");
properties.setProperty("username", "root");
properties.setProperty("password", "password");

3. 创建 JDBC 数据源

JDBCOuputFormat outputFormat = JDBCOuputFormat.buildJDBCOutputFormat()
  .setDrivername("com.mysql.jdbc.Driver")
  .setDBUrl("jdbc:mysql://localhost:3306/test")
  .setUsername("root")
  .setPassword("password")
  .setQuery("insert into user (id, name, age) values (?, ?, ?)")
  .finish();

4. 写入数据

DataSet<Tuple3<Integer, String, Integer>> dataSet = executionEnvironment.fromElements(
  new Tuple3<>(1, "张三", 20),
  new Tuple3<>(2, "李四", 30),
  new Tuple3<>(3, "王五", 40)
);

dataSet.output(outputFormat);

三、实战案例

为了加深理解,让我们以一个实战案例为例。假设我们有一个存储在 MySQL 数据库中的用户信息表,现在我们要将这些数据读取出来并写入到另一个 MySQL 数据库中。

步骤:

  1. 创建 FlinkSQL 环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 读取数据:
CREATE TABLE user (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/test',
  'table-name' = 'user',
  'username' = 'root',
  'password' = 'password'
);

SELECT * FROM user;
  1. 写入数据:
CREATE TABLE user_copy (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/test2',
  'table-name' = 'user_copy',
  'username' = 'root',
  'password' = 'password'
);

INSERT INTO user_copy SELECT * FROM user;

四、优势

  • 强大的 SQL 支持: FlinkSQL 支持标准 SQL 语法,易于开发和维护。
  • 高性能: FlinkSQL 基于 Flink 引擎,具有高吞吐量和低延迟的处理能力。
  • 灵活的数据源: FlinkSQL 支持多种数据源,包括 JDBC、Kafka、HDFS 和 Elasticsearch 等。
  • 丰富的扩展性: FlinkSQL 可以通过 UDF、UDAGG 和 UDTF 等扩展,以满足更复杂的数据处理需求。

五、常见问题解答

  1. FlinkSQL 与 JDBC(MySQL) 的结合有什么优势?

    • 强大的 SQL 支持,易于开发和维护。
    • 高性能,满足高吞吐量和低延迟需求。
    • 灵活的数据源,支持多种数据库。
    • 丰富的扩展性,可满足复杂的数据处理需求。
  2. 如何配置 JDBC 连接信息?

    • 使用 Properties 对象设置驱动程序、URL、用户名和密码。
  3. 如何读取 JDBC(MySQL) 数据?

    • 创建 JDBCInputFormat,设置连接信息和查询语句。
  4. 如何写入 JDBC(MySQL) 数据?

    • 创建 JDBCOuputFormat,设置连接信息和插入语句。
  5. 如何在 FlinkSQL 中创建表并读取和写入数据?

    • 使用 CREATE TABLE 语句创建表,并使用 SELECT 和 INSERT INTO 语句读取和写入数据。