返回

Kafka Streams聚合Long数据异常分析及解决

java

Kafka Streams 中的聚合和求和问题分析

使用 Kafka Streams 处理数据流时,经常需要对数据进行聚合和求和操作。如果配置不当,可能导致意料之外的异常。这篇文章探讨一个在 Kafka Streams 中聚合 Long 类型数据时常见的错误,并分析错误原因,同时提供相应的解决方案。

现象

在尝试使用 aggregate 函数对 Long 类型的值进行求和时,应用出现 SerializationException: Size of data received by IntegerDeserializer is not 4 异常。异常栈跟踪显示问题出在尝试从状态存储(state store)读取数据时,IntegerDeserializer 尝试读取数据,但实际存储的并非 Integer 类型。

问题分析

这个问题的核心在于,即使在代码中使用了 Serdes.Long() 指定了值序列化器,Kafka Streams 在状态存储中依然可能尝试使用其他类型的数据序列化器来反序列化数据。主要原因是 状态存储(State Store) 需要被配置正确的 key 和 value 的序列化器 。 在 Kafka Streams 中使用 aggregate 方法时,需要定义好状态存储所使用的key和value的序列化器。如果在创建Materialized时没有显式地指定序列化器,或者指定的序列化器和数据的实际类型不匹配,就会发生上述的序列化异常。在例子中,Materialized 定义中的Key值被显示配置成了 StringLong, 然而 Kafka Streams 默认情况下尝试使用 IntegerDeserializer 进行反序列化,导致异常。

默认情况下,Materialized 类型会尝试寻找 Key/Value 的具体实现来序列化/反序列化,而并没有使用开发者认为应该使用的值类型为 Long 的序列化器,从而造成错误。

解决方案

理解问题本质后,有以下几种方式解决 Kafka Streams 聚合 Long 类型数据时遇到的问题:

1. 显式指定 Value Serde

最直接的方法是在创建 Materialized 对象时,明确指定 value 的序列化器Serdes.Long(), 同时也可以使用 withKeySerde(Serdes.String())来显式指定Key值的序列化器。这样做可以保证 Kafka Streams 使用正确的序列化器读取存储状态中的值。

代码示例

    @Bean
    public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
        JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
        jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);

        return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
                .mapValues((readOnlyKey, value) -> value.amount)
                .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
                .aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
                         Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount")
                         .withKeySerde(Serdes.String()) // 显示指定Key值的序列化器
                        .withValueSerde(Serdes.Long()))  // 显式指定 Value 值的序列化器
                .toStream();
    }

操作步骤

  1. aggregate 方法中, 使用Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("transactionAmountCount") 方法创建 Materialized 实例。
  2. 调用 withKeySerde(Serdes.String()) 指定Key值为String的序列化器。
  3. 调用 withValueSerde(Serdes.Long()) 明确指定 Value 值为Long的序列化器。
  4. 重新编译并运行 Kafka Streams 应用程序。

2. 使用 Materialized.with 方法

可以使用 Materialized.with 来简洁指定key和value的序列化器。这种方式与第一种效果一致,更简洁一些。

代码示例

@Bean
    public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
        JsonSerde<BankTransaction> jsonSerde = new JsonSerde<>(BankTransaction.class);
        jsonSerde.configure(Map.of("spring.json.type.mapping", "BankTransaction:nl.sourcelabs.kafkasolo.streamprocessing.BankTransaction"), false);

        return kStreamBuilder.stream("transactions", Consumed.with(Serdes.String(), jsonSerde))
            .mapValues((readOnlyKey, value) -> value.amount)
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
             .aggregate(() -> 0L, (key, value, aggregate) -> aggregate + value,
                    Materialized.as("transactionAmountCount", Serdes.String(), Serdes.Long())) // 使用Materialized.with指定序列化器
            .toStream();
    }

操作步骤

  1. 使用 Materialized.as("transactionAmountCount", Serdes.String(), Serdes.Long()), 一步指定存储的名称,以及 key 和 value 的序列化器。
  2. 重新编译并运行 Kafka Streams 应用程序。

安全建议

在处理序列化和反序列化时,需注意以下事项:

  • 一致性 : 确保 Kafka topic 中的消息的序列化器和反序列化器配置与 Kafka Streams 应用程序中配置的一致。
  • 显式声明 : 避免使用默认序列化器,务必显式指定所需的序列化器,保证数据的类型和 Kafka Streams 的预期匹配。
  • 数据验证 : 在流处理应用中,适当添加数据校验机制,能够及早发现并处理数据不匹配等问题,防止因数据异常导致整个应用崩溃。
  • 监控 : 使用监控工具观察应用程序的状态,尤其是涉及到状态存储的操作,这样可以在发生问题时及时采取措施。

总结

Kafka Streams 提供了强大流处理能力,在聚合 Long 型数据时,务必正确配置序列化器。显式使用 Materialized 来配置 key 和 value 的序列化器是避免此问题的关键。通过上述方法,可以顺利的对 Kafka 流数据进行聚合运算。