返回

用Java客户端发送消息到Kafka

后端

使用Java客户端连接Kafka

  1. 创建Java项目
    首先,您需要创建一个新的Java项目。您可以使用您喜欢的IDE或文本编辑器,但为了方便起见,我们建议您使用Maven。

  2. 添加Kafka依赖项
    接下来,您需要将Kafka客户端依赖项添加到您的项目中。您可以通过在pom.xml文件中添加以下依赖项来实现:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.3.1</version>
</dependency>
  1. 创建Kafka Producer
    现在,您可以创建Kafka生产者对象。要做到这一点,您需要创建一个Properties对象并将其传递给ProducerConfig类。Properties对象将包含所有必要的生产者配置属性。
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(properties);
  1. 发送消息
    创建生产者对象后,您可以开始发送消息。要做到这一点,您需要使用send()方法。该方法接受一个Record对象作为参数,其中包含消息的键、值和主题。
Record<String, String> record = new Record<>("my-topic", "hello, world");

producer.send(record);
  1. 关闭生产者
    最后,在您不再需要生产者时,您需要关闭它。要做到这一点,您需要调用close()方法。
producer.close();

生产者常用的参数和属性

参数 默认值
bootstrap.servers Kafka代理的列表,格式为"host1:port1,host2:port2" null
key.serializer 用于序列化键的类 null
value.serializer 用于序列化值的类 null
batch.size 要发送到代理的批次的最大字节大小 16384
linger.ms 在发送批次之前等待的最大毫秒数 0
buffer.memory 生产者用于缓冲等待发送的消息的总内存字节数 33554432
max.block.ms 生产者在缓冲区已满时等待可用空间的最大毫秒数 60000
retries 发送失败时重试的次数 0
request.timeout.ms 发送请求到代理的超时时间,以毫秒为单位 30000
max.in.flight.requests.per.connection 每个连接的最大未完成请求数 5

生产者API和扩展点

除了前面提到的方法外,生产者API还提供了许多其他有用的方法和扩展点。其中包括:

  • send(ProducerRecord) - 此方法允许您发送ProducerRecord对象,该对象包含消息的键、值、主题和分区。
  • send(String, String) - 此方法允许您发送字符串键和值的消息。
  • flush() - 此方法强制生产者立即将所有缓冲的消息发送到代理。
  • close() - 此方法关闭生产者并释放所有资源。

生产者API还提供了许多扩展点,允许您自定义生产者的行为。其中包括:

  • Interceptor - 拦截器可以用来检查和修改生产者发送的消息。
  • Partitioner - 分区器可以用来确定消息应该发送到哪个分区。
  • Serializer - 序列化器可以用来将消息键和值序列化为字节数组。

示例代码

以下是一个使用Java客户端发送消息到Kafka的示例代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExample {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "hello, world");

        producer.send(record);

        producer.close();
    }
}

总结

希望本文能够帮助您入门Java客户端连接Kafka并发送消息。如果您有任何问题或建议,请随时与我们联系。