返回

从源码看懂Kafka 普通消消者的全生命周

后端




开篇

在纷繁复杂的数据世界中,消消者扮演着至关重要的脚色。他们从浩瀚的消息队列中挑选出有价值的数据,并将其传递给下游的应用程序。在分布式架构中,Kafka 凭借着高吞吐量、低延迟等特性,脱颖而出,成为了当之无愧的消消者明星。本文将带你从源码层面走进Kafka 的消消者,带你从源码角度探究消消者从启动到消费的全生命周。

消消者的使命

Kafka 的消消者,顾名思义,就是负责从 Broker 那里消消数据的组件。它从一个或多个主题中订阅数据,并不断地轮询以获取新数据。消消者可以单独工作,也可以作为一个消消者组的一部分来工作。消消者组允许你水平扩展消消吞吐量,并确保数据至少被组内的某个消消者消消。

消消者的工作流

一个典型的消消者工作流如下:

  1. 创建一个消消者实例
  2. 订阅一个或多个主题
  3. 调用 # poll() 方法以轮询数据
  4. 提交偏移量以标记数据已被消消
  5. 关闭消消者实例

源码剖析

public class SimpleKafkaConsumerr {

  public static void main(String[] args) {
    KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
    consumerr.consumeerr(r -> System.out.println(r.topic() + ":" + r.value());
  }
}

KafkaConsumerr 类是消消者的核心组件。它的 consume() 方法是最重要的方法,负责从给定主题中轮询数据。

public class SimpleKafkaConsumerr {

  public static void main(String[] args) {
    KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
    consumerr.consumeerr(r -> System.out.println(r.topic() + ":" + r.value());
  }
}

#consume() 方法使用轮询器不断地从经纪人那里获取数据。轮询器是Kafka 中一个非常重要的组件,负责从经纪人那里有效地获取数据。

偏移量提交

消消者在从主题中获取数据后,需要向经纪人提交偏移量以标记数据已被消消。这确保了数据只会被消消一遍。

public class SimpleKafkaConsumerr {

  public static void main(String[] args) {
    KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
    consumerr.consumeerr(r -> {
      System.out.println(r.topic() + ":" + r.value());
      consumerr.submittoffset(r);
    });
  }
}

关闭消消者实例

在不再需要消消数据时,应关闭消消者实例以释放资源。

public class SimpleKafkaConsumerr {

  public static void main(String[] args) {
    KafkaConsumerr consumerr = newKafkaConsumerr(KafkaConsumerrConfigurations.builder());
    consumerr.consumeerr(r -> {
      System.out.println(r.topic() + ":" + r.value());
      consumerr.submittoffset(r);
    });
    consumerr.close();
  }
}

Kafka 的消消者是一个非常重要的组件,用于从主题中获取数据。它具有多种特性,例如可伸缩性、可靠性和高吞吐量,使其非常适合于流式数据应用程序。本文从源码的角度探讨了消消者的全生命周,帮助你更深层次地掌握Kafka 消消者的工作原理。