Flink固定窗口超时关闭:解决无数据延迟问题
2025-01-31 02:21:17
Flink 中固定窗口的超时关闭
数据流处理中,常常需要使用窗口进行聚合计算。基于时间划分的固定窗口,例如 Tumbling Window,非常常用。 遇到这样的问题:当特定时间段内没有数据到达,先前窗口并不会立即关闭。 例如,如果使用 1 分钟滚动窗口计算每分钟的事件数量,当某分钟没有数据时,上一个有数据的窗口不会关闭,这会造成资源等待。这种现象可能会由于多种因素造成,例如:数据源迟滞、系统故障,或者短暂的数据静默期。我们需要一种方法能够在等待一段时间后,强制关闭旧窗口,以便处理后续数据。
问题分析
在 Flink 的时间窗口机制中,窗口的关闭是由 watermark 触发的。 当 watermark 超过窗口的结束时间时,窗口会被视为完成并关闭。 如果一段时间没有数据到达,自然就不会有新的 watermark 产生,那么旧窗口便不会关闭,一直处于 “打开” 状态,并持续消耗资源。
解决方案:引入空闲数据检测
一个有效的方法是引入空闲数据检测机制,它可以定期检测是否有数据到达。如果长时间检测到没有数据到达,便强制生成一个带有较晚时间戳的水印,从而使先前未关闭的窗口完成计算并关闭。 这个方案能够让应用在长时间空闲后继续运行,避免无限期等待带来的资源消耗。
代码示例和操作步骤:
- 创建一个
ProcessWindowFunction
用于生成 watermark
以下是一个简化的示例 ProcessWindowFunction
, 使用 ProcessingTime
为没有数据的窗口生成 watermark. 请注意, ProcessingTime
用于实现 watermark 定时触发, 而不会受 Event Time 的影响. AllowedLateness
也应该适当配置, 例如设置 allowedLateness(Time.seconds(10))
, 用于处理迟到的数据.
```java
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class InactivityWindowFunction<IN, OUT> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {
private final long inactivityTimeoutMillis;
public InactivityWindowFunction(long inactivityTimeoutMillis) {
this.inactivityTimeoutMillis = inactivityTimeoutMillis;
}
@Override
public void process(String key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception {
boolean hasElements = elements.iterator().hasNext();
long currentTime = System.currentTimeMillis();
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
long lastActivityTime = context.windowState().getState(
new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
.value();
if (hasElements) {
context.windowState()
.getState(new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
.update(currentTime);
// do some real aggregation logic before out
if (out !=null) {
// real aggregate result
// out.collect((OUT));
}
}else if(!hasElements && lastActivityTime != 0 && (currentTime - lastActivityTime ) > inactivityTimeoutMillis ) {
context.windowState().clear(); //清除 state 避免 state 持续增大。
context.output(null, new org.apache.flink.api.common.eventtime.Watermark(windowEnd + 1));
}
else if (!hasElements && lastActivityTime == 0) {
context.windowState()
.getState(new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
.update(currentTime);
}
}
}
```
- 修改 Flink 代码
修改原有 Flink 代码,在 window
操作之后应用上述自定义 ProcessWindowFunction
:
```java
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
OutputTag<Watermark> lateWatermark = new OutputTag<>("late-watermark", TypeInformation.of(Watermark.class));
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
.keyBy((KeySelector<Entity, String>) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.process(new InactivityWindowFunction<Entity, Tuple2<String,Integer>>(TimeUnit.MINUTES.toMillis(2))).process(new ProcessWindowFunction<Tuple2<String, Integer>,Tuple2<String,Integer> , String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(iterable.iterator().hasNext()){
Tuple2<String,Integer> first= iterable.iterator().next();
collector.collect(new Tuple2<>(s,1));
}
}
}).addSink(eventRateProducer);
// process late event via the side output for debugging or late data cleaning purpose
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.
.withTimestampAssigner((SerializableTimestampAssigner
.keyBy((KeySelector<Entity, String>) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.process(new InactivityWindowFunction<Entity, Void>(TimeUnit.MINUTES.toMillis(2)))
.getSideOutput(lateWatermark)
.print() // do any things you want here for those data arrive late
```
代码片段修改为使用ProcessWindowFunction
, 用于管理 inactive 的窗口。 其中 inactivityTimeoutMillis
设置为 2 分钟,可根据具体情况调整。需要设置 OutputTag
用于输出迟到数据。 在实际聚合逻辑之前增加了一个 filter ,只保留了Tuple2
数据。
- 说明
- 该解决方案的核心在于当一段时间没有数据到达时,使用系统时间(Processing Time)更新 Window state中的 lastActivityTime 和强制生成一个较晚时间的 watermark , 从而强制 Flink 关闭那些未关闭的窗口。
- 在实践中,需要合理配置超时时间, 避免在不该关闭窗口的情况也将其关闭。
总结
此方案利用自定义的 ProcessWindowFunction
增加了对空闲窗口的监测和处理能力。 通过设定合适的超时时间,可以解决 Flink 固定窗口由于没有数据导致的延迟关闭问题,同时提高数据流处理的健壮性,更好地应对各种异常情况。
注意,在处理延迟数据方面,可以考虑设置合理的AllowedLateness
值,允许窗口延迟关闭,收集可能的迟到数据。另外需要增加侧输出对延迟关闭窗口的水印和数据做进一步处理,或加入报警系统做数据排查和数据修正。