返回

Flink 1.9 实战:使用 SQL 实时读取 Kafka 数据并写入 MySQL

见解分享




**Flink 1.9 实战:使用 SQL 实时读取 Kafka 数据并写入 MySQL** 

Apache Flink 是一个功能强大的开源流处理框架,以其低延时、高吞吐量和可扩展性而闻名。Flink 1.9 版本引入了对 SQL 的原生支持,使开发人员能够使用熟悉的 SQL 语言进行流处理操作。

在本篇文章中,我们将逐步指导您如何使用 Flink 1.9 的 SQL API 从 Kafka 中实时读取数据并写入 MySQL。该示例将涵盖设置说明、示例代码和最佳实践,以帮助您快速构建一个实时数据管道。

**先决条件** 

* Java 8 或更高版本
* Apache Flink 1.9 或更高版本
* Apache Kafka
* MySQL 数据库

**设置** 

1. **安装 Flink** 

   按照 Flink 网站上的说明安装 Flink。

2. **安装 Kafka** 

   按照 Kafka 网站上的说明安装 Kafka。

3. **安装 MySQL** 

   按照 MySQL 网站上的说明安装 MySQL。

4. **创建 Kafka 主题** 

   使用以下命令创建名为 "sensor-data" 的 Kafka 主题:

kafka-topics --create --topic sensor-data --replication-factor 1 --zookeeper my-zookeeper:2181


5. **启动 Kafka** 

启动 Kafka 集群:

kafka-server-start.sh /path/to/server.properties &


6. **启动 MySQL** 

启动 MySQL 数据库:

service mysql start


**示例代码** 

1. **创建 Flink 项目** 

使用以下命令创建新的 Flink 项目:

mvn io.takari:maven:wrapper
./mvnw package


2. **添加依赖项**`pom.xml` 中添加以下依赖项:

org.apache.flink flink-java 1.9.0 org.apache.flink flink-sql-client_2.11 1.9.0 ```
  1. 读取 Kafka 数据

    使用以下代码从 Kafka 中读取数据:

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "my-kafka:9092");
    
    DataStream<String> dataStream = env.addSource(
        new FlinkKakaoaConsumer010<>("sensor-data", new SimpleStringSchema(), props)
    );
    
  2. 解析 JSON 数据

    使用以下代码将 JSON 数据解析为 Flink 数据流:

    DataStream<SensorData> sensorDataStream = dataStream
        .map(new MapFunction<String, SensorData>() {
            @Override
            public SensorData map(String value) throws Exception {
                return new SensorData(value);
            }
        });
    
  3. 写入 MySQL

    使用以下代码将数据写入 MySQL:

    JdbcOutputFormat<SensorData> outputFormat = new
    JdbcOutputFormat<SensorData>("jdbc:mysql://my-mysql:3306/sensor_db?user=root&password=password",
    "insert into sensor_data (sensor_id, temperature, humidity) values (?, ?, ?)",
    new PreparedStatementSetter<SensorData>() {
        @Override
        public void setParameters(SensorData data, PreparedStatement preparedStatement)
            throws Exception {
                preparedStatement.setInt(1, data.getId());
                preparedStatement.setFloat(2, data.getTemperature());
                preparedStatement.setFloat(3, data.getHumidity());
            }
        });
    
    sensorDataStream.addSink(outputFormat);
    
  4. 运行 Flink 作业

    使用以下命令运行 Flink 作业:

    flink run -c com.example.sensor.data.SensorDataJob /path/to/sensor-data-job.jar
    

最佳实践

  • 使用事件时间语义 :事件时间语义允许您根据数据中包含的时间戳对数据进行排序和处理。
  • 并行化您的任务 :通过并行化您的任务,可以提高吞吐量并缩短处理时间。
  • 使用快照和故障恢复 :Flink 提供快照和故障恢复机制,以确保在发生故障时不会丢失数据。
  • 监控您的管道 :使用 Flink 的监控工具来监控管道并确保其正常运行。

总结

在本文中,我们逐步指导了您如何使用 Apache Flink 1.9 的 SQL API 从 Kafka 中实时读取数据并写入 MySQL。通过遵循这些步骤并实施最佳实践,您现在可以轻松构建实时数据管道,以满足您的数据处理需求。