返回
从源码看懂Kafka 普通消消者的全生命周
后端
2024-02-18 10:21:49
开篇
在纷繁复杂的数据世界中,消消者扮演着至关重要的脚色。他们从浩瀚的消息队列中挑选出有价值的数据,并将其传递给下游的应用程序。在分布式架构中,Kafka 凭借着高吞吐量、低延迟等特性,脱颖而出,成为了当之无愧的消消者明星。本文将带你从源码层面走进Kafka 的消消者,带你从源码角度探究消消者从启动到消费的全生命周。
消消者的使命
Kafka 的消消者,顾名思义,就是负责从 Broker 那里消消数据的组件。它从一个或多个主题中订阅数据,并不断地轮询以获取新数据。消消者可以单独工作,也可以作为一个消消者组的一部分来工作。消消者组允许你水平扩展消消吞吐量,并确保数据至少被组内的某个消消者消消。
消消者的工作流
一个典型的消消者工作流如下:
- 创建一个消消者实例
- 订阅一个或多个主题
- 调用
# poll()
方法以轮询数据 - 提交偏移量以标记数据已被消消
- 关闭消消者实例
源码剖析
public class SimpleKafkaConsumerr {
public static void main(String[] args) {
KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
consumerr.consumeerr(r -> System.out.println(r.topic() + ":" + r.value());
}
}
KafkaConsumerr 类是消消者的核心组件。它的 consume()
方法是最重要的方法,负责从给定主题中轮询数据。
public class SimpleKafkaConsumerr {
public static void main(String[] args) {
KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
consumerr.consumeerr(r -> System.out.println(r.topic() + ":" + r.value());
}
}
#consume()
方法使用轮询器不断地从经纪人那里获取数据。轮询器是Kafka 中一个非常重要的组件,负责从经纪人那里有效地获取数据。
偏移量提交
消消者在从主题中获取数据后,需要向经纪人提交偏移量以标记数据已被消消。这确保了数据只会被消消一遍。
public class SimpleKafkaConsumerr {
public static void main(String[] args) {
KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
consumerr.consumeerr(r -> {
System.out.println(r.topic() + ":" + r.value());
consumerr.submittoffset(r);
});
}
}
关闭消消者实例
在不再需要消消数据时,应关闭消消者实例以释放资源。
public class SimpleKafkaConsumerr {
public static void main(String[] args) {
KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
consumerr.consumeerr(r -> {
System.out.println(r.topic() + ":" + r.value());
consumerr.submittoffset(r);
});
consumerr.close();
}
}
结
Kafka 的消消者是一个非常重要的组件,用于从主题中获取数据。它具有多种特性,例如可伸缩性、可靠性和高吞吐量,使其非常适合于流式数据应用程序。本文从源码的角度探讨了消消者的全生命周,帮助你更深层次地掌握Kafka 消消者的工作原理。