返回

如何在 Visual Studio Codes Maven 中使用 Java 编写 Flink MongoDB Sink?

java

在 Visual Studio Codes Maven 中使用 Java 编写 Flink MongoDB Sink

简介

本教程将指导你如何使用 Java 在 Visual Studio Codes Maven 中编写 Flink MongoDB Sink。通过这个教程,你将了解如何在 Apache Flink 中构建数据管道,将数据从 Flink 流持续地写入 MongoDB 数据库中。

先决条件

  • Java 开发工具包 (JDK) 17.0.2 或更高版本
  • Maven 3.9.6 或更高版本
  • Apache Flink 1.18.1
  • Visual Studio Code x64 1.87.2
  • MongoDB 已安装并运行

创建 Flink 项目

  1. 打开 Visual Studio Codes 并创建一个新文件夹作为 Flink 项目。
  2. 在该文件夹中,创建一个名为 pom.xml 的 Maven POM 文件。

添加 Maven 依赖

pom.xml 文件中,添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.18.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mongodb-shaded</artifactId>
        <version>1.18.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>4.6.4</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
    </dependency>
</dependencies>

编写 Java 类

Car.java (POJO 类)

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Car {
    private String brand;
    private int price;
}

编写 Flink 代码

创建一个名为 FlinkMongoSinkExample.java 的 Java 类,包含以下代码:

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineResult;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.mongo.MongoSink;
import org.bson.Document;

import java.util.ArrayList;
import java.util.List;

public class FlinkMongoSinkExample {

    public static void main(String[] args) throws Exception {

        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置并行度
        env.setParallelism(1);

        // 创建数据源
        List<Car> cars = new ArrayList<>();
        cars.add(new Car("BMW", 500));
        cars.add(new Car("Kia", 300));
        cars.add(new Car("Ford", 600));
        DataStream<Car> carStream = env.fromCollection(cars);

        // 创建 MongoDB Sink
        MongoSink<Car> sink = MongoSink.<Car>builder()
                .setUri("mongodb://127.0.0.1:27017")
                .setDatabase("class_db")
                .setCollection("class_coll")
                .setBatchSize(1000)
                .setBatchIntervalMs(1000)
                .setMaxRetries(3)
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setSerializationSchema(
                        (input, context) -> {
                            Document doc = new Document(input.getBrand(), input.getPrice());
                            System.out.println(doc);
                            return new InsertOneModel<>(BsonDocument.parse(doc.toJson()));
                        })
                .build();

        // 将流写入 MongoDB
        carStream.sinkTo(sink);

        // 执行作业
        PipelineResult pipelineResult = env.execute("Flink MongoDB POJO Example");

        // 关闭流执行环境
        env.close();
    }
}

运行代码

  1. 在终端中,导航到项目文件夹。
  2. 运行以下命令:
mvn clean package
  1. 成功打包后,运行以下命令:
java -jar target/flink-mongo-sink-example-1.0-SNAPSHOT.jar

验证

  1. 打开 MongoDB 并连接到指定的数据库和集合(class_dbclass_coll)。
  2. 在集合中,应该可以看到插入的数据。

结论

通过遵循本教程中的步骤,你已经学会了如何使用 Java 在 Visual Studio Codes Maven 中编写 Flink MongoDB Sink。这个功能使你可以轻松地将数据从 Flink 流写入 MongoDB,从而实现数据存储和检索。

常见问题解答

1. 如何设置 MongoDB 连接参数?
你可以通过调用 setUri()setDatabase()setCollection() 方法来设置 MongoDB 连接参数。

2. 如何控制数据写入速率?
可以使用 setBatchSize()setBatchIntervalMs() 方法来控制数据写入速率。

3. 如何处理写入错误?
可以使用 setMaxRetries()setDeliveryGuarantee() 方法来处理写入错误。

4. 如何自定义数据序列化方式?
可以通过实现 SerializationSchema 接口来自定义数据序列化方式。

5. Flink MongoDB Sink 中支持哪些数据类型?
Flink MongoDB Sink 支持所有 Java POJO 和元组类型,以及二进制和 JSON 数据。