返回
以Flink的source为支点,纵览实时计算之源
见解分享
2023-09-04 22:54:58
Flink Sources:将数据流入实时处理引擎
什么是 Flink Source?
Flink 源是数据采集管道的一部分,负责从外部系统提取数据并将其转换为 Flink 可以处理的格式。它充当数据流入实时处理引擎的门户。
不同类型的 Source 实现
Flink 提供了多种 source 实现,以满足不同的数据源要求。这些实现包括:
- 从集合读取: 适用于从内存中读取数据,用于测试或原型开发。
- 从文件读取: 适用于从本地或分布式文件系统中读取数据。
- 从 Kafka 读取: 适用于从流行的消息系统中读取数据。
- 自定义 Source: 允许用户定义自己的数据提取逻辑,以连接到其他数据源。
Source 实现示例
从集合读取:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SourceFromCollection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromCollection(Arrays.asList("Hello", "World", "Flink"));
dataStream.print();
env.execute();
}
}
从文件读取:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SourceFromFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.readTextFile("/path/to/file");
dataStream.print();
env.execute();
}
}
从 Kafka 读取:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class SourceFromKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
DataStream<String> dataStream = env.addSource(consumer);
dataStream.print();
env.execute();
}
}
自定义 Source:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class CustomSource extends RichSourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.collect("Data " + i);
}
}
@Override
public void cancel() {
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CustomSource source = new CustomSource();
DataStream<String> dataStream = env.addSource(source);
dataStream.print();
env.execute();
}
}
结论
Flink Source 是构建实时处理管道的基石。理解和有效使用它们至关重要。通过利用 Flink 的各种 source 实现,开发人员可以轻松地将数据从外部系统集成到实时处理引擎中。
常见问题解答
-
什么是 Flink Source?
- Flink Source 负责从外部系统采集数据并将其转换为 Flink 可以处理的格式。
-
有哪些类型的 Flink Source?
- Flink Source 有多种类型,包括从集合、文件、Kafka 和自定义数据源读取的 Source。
-
如何从集合中读取数据?
- 使用
fromCollection
方法从内存中的集合中读取数据。
- 使用
-
如何从文件中读取数据?
- 使用
readTextFile
方法从本地或分布式文件系统中读取数据。
- 使用
-
如何从 Kafka 中读取数据?
- 使用
addSource
方法添加FlinkKafkaConsumer
来从 Kafka 中读取数据。
- 使用