返回

仅凭键过滤和转发 Kafka 消息:告别不必要序列化

java

仅凭键过滤和转发 Kafka 消息:避免不必要的序列化

引言

在处理大量数据流时,Apache Kafka Streams 提供了一种便捷的方式来创建复杂的流处理应用程序。本文重点介绍一个特定的场景,其中我们希望仅根据键过滤 Kafka 消息并将其转发到新主题。我们还将讨论优化此过程的技巧,以避免不必要的开销。

过滤和转发

假设我们有一个包含大量消息的 Kafka 主题,每个消息的键都标记有信息。我们的目标是过滤某些组的消息并将其转发到新主题。例如,我们可以过滤键以特定前缀开头的消息。

使用 Kafka Streams,我们可以使用 filter() 转换器轻松实现此过滤。然而,需要注意的是,此过程涉及反序列化键和值、执行过滤以及序列化新消息以写入输出主题。

优化避免序列化

在我们的场景中,我们仅使用键进行过滤。反序列化消息值是多余的。我们可以通过以下优化来避免不必要的序列化开销:

使用 flatMapValues() 转换器

flatMapValues() 转换器允许我们对流中的每个值应用一个函数,该函数可以生成零个或多个新值。在本例中,我们可以仅生成原始消息的键,如下所示:

inputStream.flatMapValues(value -> Collections.singleton(null))
           .filter((key, value) -> key != null && key.startsWith(filteredGroup))
           .to(filteredGroup+"-topic", Produced.with(Serdes.String(), Serdes.String()));

使用自定义反序列化程序

另一种优化方法是创建自定义反序列化程序,它仅反序列化键,而忽略值。这需要更多的手动工作,但可以提供最佳性能。

结论

通过优化 Kafka Streams 以避免不必要的序列化,我们提高了流处理应用程序的性能和效率。根据键过滤消息是一项常见任务,了解优化此过程的技巧至关重要。

常见问题解答

  1. 为什么使用 flatMapValues() 而不是 map() flatMapValues() 生成零个或多个新值,而 map() 仅生成单个值。在我们的情况下,我们只对键感兴趣,因此 flatMapValues() 更适合。
  2. 如何优化具有复杂过滤器的场景? 对于更复杂的过滤器,可以使用 process() 转换器。它允许自定义处理逻辑,包括选择性序列化和反序列化。
  3. 如何避免创建和销毁太多流? 考虑使用 table() 转换器存储中间状态,避免创建和销毁不必要的流。
  4. 如何优化反序列化开销? 使用 withCaching() 设置可以缓存反序列化值。这对于频繁访问的值很有用。
  5. 如何在不重新处理消息的情况下添加新过滤器? 使用 repartition() 转换器将流重新分区到不同的主题。这允许动态添加新过滤器,而无需重新处理现有消息。