返回

以Flink的source为支点,纵览实时计算之源

见解分享

Flink Sources:将数据流入实时处理引擎

什么是 Flink Source?

Flink 源是数据采集管道的一部分,负责从外部系统提取数据并将其转换为 Flink 可以处理的格式。它充当数据流入实时处理引擎的门户。

不同类型的 Source 实现

Flink 提供了多种 source 实现,以满足不同的数据源要求。这些实现包括:

  • 从集合读取: 适用于从内存中读取数据,用于测试或原型开发。
  • 从文件读取: 适用于从本地或分布式文件系统中读取数据。
  • 从 Kafka 读取: 适用于从流行的消息系统中读取数据。
  • 自定义 Source: 允许用户定义自己的数据提取逻辑,以连接到其他数据源。

Source 实现示例

从集合读取:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceFromCollection {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.fromCollection(Arrays.asList("Hello", "World", "Flink"));
        dataStream.print();
        env.execute();
    }
}

从文件读取:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceFromFile {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.readTextFile("/path/to/file");
        dataStream.print();
        env.execute();
    }
}

从 Kafka 读取:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class SourceFromKafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
        DataStream<String> dataStream = env.addSource(consumer);
        dataStream.print();
        env.execute();
    }
}

自定义 Source:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class CustomSource extends RichSourceFunction<String> {

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.collect("Data " + i);
        }
    }

    @Override
    public void cancel() {

    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CustomSource source = new CustomSource();
        DataStream<String> dataStream = env.addSource(source);
        dataStream.print();
        env.execute();
    }
}

结论

Flink Source 是构建实时处理管道的基石。理解和有效使用它们至关重要。通过利用 Flink 的各种 source 实现,开发人员可以轻松地将数据从外部系统集成到实时处理引擎中。

常见问题解答

  • 什么是 Flink Source?

    • Flink Source 负责从外部系统采集数据并将其转换为 Flink 可以处理的格式。
  • 有哪些类型的 Flink Source?

    • Flink Source 有多种类型,包括从集合、文件、Kafka 和自定义数据源读取的 Source。
  • 如何从集合中读取数据?

    • 使用 fromCollection 方法从内存中的集合中读取数据。
  • 如何从文件中读取数据?

    • 使用 readTextFile 方法从本地或分布式文件系统中读取数据。
  • 如何从 Kafka 中读取数据?

    • 使用 addSource 方法添加 FlinkKafkaConsumer 来从 Kafka 中读取数据。