返回

Flink固定窗口超时关闭:解决无数据延迟问题

java

Flink 中固定窗口的超时关闭

数据流处理中,常常需要使用窗口进行聚合计算。基于时间划分的固定窗口,例如 Tumbling Window,非常常用。 遇到这样的问题:当特定时间段内没有数据到达,先前窗口并不会立即关闭。 例如,如果使用 1 分钟滚动窗口计算每分钟的事件数量,当某分钟没有数据时,上一个有数据的窗口不会关闭,这会造成资源等待。这种现象可能会由于多种因素造成,例如:数据源迟滞、系统故障,或者短暂的数据静默期。我们需要一种方法能够在等待一段时间后,强制关闭旧窗口,以便处理后续数据。

问题分析

在 Flink 的时间窗口机制中,窗口的关闭是由 watermark 触发的。 当 watermark 超过窗口的结束时间时,窗口会被视为完成并关闭。 如果一段时间没有数据到达,自然就不会有新的 watermark 产生,那么旧窗口便不会关闭,一直处于 “打开” 状态,并持续消耗资源。

解决方案:引入空闲数据检测

一个有效的方法是引入空闲数据检测机制,它可以定期检测是否有数据到达。如果长时间检测到没有数据到达,便强制生成一个带有较晚时间戳的水印,从而使先前未关闭的窗口完成计算并关闭。 这个方案能够让应用在长时间空闲后继续运行,避免无限期等待带来的资源消耗。

代码示例和操作步骤:

  1. 创建一个 ProcessWindowFunction 用于生成 watermark

以下是一个简化的示例 ProcessWindowFunction, 使用 ProcessingTime 为没有数据的窗口生成 watermark. 请注意, ProcessingTime 用于实现 watermark 定时触发, 而不会受 Event Time 的影响. AllowedLateness 也应该适当配置, 例如设置 allowedLateness(Time.seconds(10)), 用于处理迟到的数据.
```java
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;

 public class InactivityWindowFunction<IN, OUT> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> {

  private final long inactivityTimeoutMillis;


  public InactivityWindowFunction(long inactivityTimeoutMillis) {
      this.inactivityTimeoutMillis = inactivityTimeoutMillis;
  }

    @Override
    public void process(String key,
                        Context context,
                        Iterable<IN> elements,
                        Collector<OUT> out) throws Exception {

      boolean hasElements = elements.iterator().hasNext();
        long currentTime = System.currentTimeMillis();
        long windowStart = context.window().getStart();
        long windowEnd = context.window().getEnd();

        long lastActivityTime = context.windowState().getState(
            new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
                    .value();

      if (hasElements) {

          context.windowState()
                  .getState(new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
                  .update(currentTime);


        //  do some real aggregation logic before out
         if (out !=null) {
           // real aggregate result
           // out.collect((OUT));
          }

         }else if(!hasElements && lastActivityTime != 0  && (currentTime - lastActivityTime ) > inactivityTimeoutMillis ) {

          context.windowState().clear(); //清除 state 避免 state 持续增大。
         context.output(null, new org.apache.flink.api.common.eventtime.Watermark(windowEnd + 1));
      }
    else if (!hasElements && lastActivityTime == 0) {

      context.windowState()
          .getState(new org.apache.flink.api.common.state.ValueStateDescriptor<Long>("last-activity", Long.class))
        .update(currentTime);
    }
  }

}

```
  1. 修改 Flink 代码

修改原有 Flink 代码,在 window 操作之后应用上述自定义 ProcessWindowFunction

```java
   StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    OutputTag<Watermark> lateWatermark = new OutputTag<>("late-watermark",  TypeInformation.of(Watermark.class));


kafkaDataStream
        .flatMap(new EventFlatter())
        .filter(Objects::nonNull)
        .assignTimestampsAndWatermarks(WatermarkStrategy
            .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(2))
            .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
        .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
        .process(new InactivityWindowFunction<Entity, Tuple2<String,Integer>>(TimeUnit.MINUTES.toMillis(2))).process(new  ProcessWindowFunction<Tuple2<String, Integer>,Tuple2<String,Integer> , String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
            if(iterable.iterator().hasNext()){
                Tuple2<String,Integer> first=  iterable.iterator().next();
                    collector.collect(new Tuple2<>(s,1));
                }

            }
        }).addSink(eventRateProducer);

// process late event via the side output for debugging or late data cleaning purpose
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp()))
.keyBy((KeySelector<Entity, String>) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.process(new InactivityWindowFunction<Entity, Void>(TimeUnit.MINUTES.toMillis(2)))
.getSideOutput(lateWatermark)
.print() // do any things you want here for those data arrive late

```

代码片段修改为使用ProcessWindowFunction, 用于管理 inactive 的窗口。 其中 inactivityTimeoutMillis 设置为 2 分钟,可根据具体情况调整。需要设置 OutputTag 用于输出迟到数据。 在实际聚合逻辑之前增加了一个 filter ,只保留了Tuple2 数据。

  1. 说明
    • 该解决方案的核心在于当一段时间没有数据到达时,使用系统时间(Processing Time)更新 Window state中的 lastActivityTime 和强制生成一个较晚时间的 watermark , 从而强制 Flink 关闭那些未关闭的窗口。
    • 在实践中,需要合理配置超时时间, 避免在不该关闭窗口的情况也将其关闭。

总结

此方案利用自定义的 ProcessWindowFunction 增加了对空闲窗口的监测和处理能力。 通过设定合适的超时时间,可以解决 Flink 固定窗口由于没有数据导致的延迟关闭问题,同时提高数据流处理的健壮性,更好地应对各种异常情况。

注意,在处理延迟数据方面,可以考虑设置合理的AllowedLateness值,允许窗口延迟关闭,收集可能的迟到数据。另外需要增加侧输出对延迟关闭窗口的水印和数据做进一步处理,或加入报警系统做数据排查和数据修正。