返回

Reactor Kafka SSL Bundle 找不到 SslBundleSslEngineFactory 类?解决方案

java

Reactor Kafka 使用 SSL Bundle 无法找到 SslBundleSslEngineFactory 类的问题解决

碰到了个麻烦事儿,在使用 Reactor Kafka 和 SSL Bundle 集成的时候,总是报 SslBundleSslEngineFactory 类找不到的错。错误信息长这样:Invalid value .kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class .SslBundleSslEngineFactory could not be found.。 挺头疼的,下面来捋一捋怎么解决。

一、 问题原因分析

错误提示很明显,Kafka 客户端找不到 org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory 这个类。 为什么呢? 核心原因在于 Spring Boot 的版本和依赖问题:

  1. Spring Boot 版本过低: SslBundleSslEngineFactory 是在 Spring Boot 2.7 及以后的版本中才引入的,用于简化 SSL Bundle 的配置。如果你的项目还在用老版本(比如 Spring Boot 2.6 或更早),那自然是找不到这个类的。
  2. 缺少相关依赖 Spring Boot 的 Kafka 自动配置依赖spring-boot-starter-kafka,检查确保已正确导入到工程.

二、解决方案

针对上面分析的原因,咱分几种情况来解决。

1. 升级 Spring Boot 版本 (推荐)

最直接的方法是升级 Spring Boot 到 2.7 或更高版本。 这是最推荐的做法,因为新版本不仅带来了 SslBundleSslEngineFactory,还修复了许多 bug,提升了性能。

  • 操作步骤:

    1. 修改 pom.xml (如果使用 Maven) 或 build.gradle (如果使用 Gradle) 文件中的 Spring Boot 版本号。 比如,在 pom.xml 中:

      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.7.0</version>  
          <!-- 或更新的版本,比如 3.x -->
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
      
    2. 如果项目中有其他依赖于旧版本 Spring Boot 的库,可能需要一并升级,避免版本冲突。

    3. 清理并重新构建项目

2. 检查并导入必要依赖

如果你暂时因为某些原因还不能升级到2.7或者以上,但是需要检查和手动导入依赖

  • 操作步骤:

    1. 再次确认 spring-boot-starter-kafka依赖已存在. 如果是使用Maven,确保你的pom.xml存在:
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-kafka</artifactId>
    </dependency>
    

    如果是使用 Gradle,在你的build.gradle文件要有如下:

implementation 'org.springframework.boot:spring-boot-starter-kafka'
  ```
 2. 运行你的项目,确定相关类已经正常引入,通常问题会自动消失.

3. 显式配置 SslEngineFactory (不推荐,作为备选方案)

如果确实因为某些特别原因无法升级Spring Boot,并且方法二仍无法解决. 作为一个不太优雅但是可能能绕过问题的法子,你可以尝试手动指定底层的 Kafka SSL Engine Factory。但这要求你对 Kafka 的 SSL 配置有比较深入的理解,并且操作比较繁琐。不推荐用这个。

  • 原理: 直接用 Kafka 原生的 SslEngineFactory 接口,而不是 Spring Boot 封装的 SslBundleSslEngineFactory

  • 操作步骤 (以 org.apache.kafka.common.security.ssl.DefaultSslEngineFactory 为例):

    1. 在你的 Kafka Receiver 配置中,修改 ssl.engine.factory.class 的值:

       @Bean
      public KafkaReceiver<String, String> kafkaReceiver(MeterRegistry meterRegistry, ObservationRegistry observationRegistry) { // 移除 SslBundles 参数
          final Map<String, Object> properties = new HashMap<>();
          properties.put("security.protocol", "SSL");
          //改用 DefaultSslEngineFactory
          properties.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, "org.apache.kafka.common.security.ssl.DefaultSslEngineFactory");
      
           //原有的 SSL 配置仍然有效,可以保留
           properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore.jks");
           properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keyStorePassphrase");
           properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
           properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "trustStorePassphrase");
      
          properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host.com:9092"); // 你的 Kafka 地址
          properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "testtestfour"); // 客户端 ID
          properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testtesttfour"); // 消费组
          properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
          properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(properties);
          return KafkaReceiver.create(receiverOptions
                  .addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
                  .addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
                  .consumerListener(new MicrometerConsumerListener(meterRegistry))
                  .withObservation(observationRegistry)
                  .subscription(Collections.singleton("topic")));
      }
      
    2. 由于不再依赖SslBundle了, properties.put(SslBundle.class.getName(), sslBundles.getBundle("kafka_bundle")); 可以删去. 同时也不需要传入SslBundles类型参数了.

    3. 确保 keystoretruststore 的路径、密码配置正确。

附加技巧与安全建议(适用于以上任何方法)

  • 密钥保护: 不要把密钥(keystoretruststore 的密码)直接写在代码里或配置文件里! 应该用环境变量、配置服务器(如 Spring Cloud Config Server)或者密钥管理服务(如 HashiCorp Vault)来安全地存储和注入。

  • 最小权限原则: 为kafka的生产者消费者账号赋予尽可能最小的权限.

  • 定期轮换证书: 证书快过期要及时更换,别等到服务挂了才想起来。

  • 仔细配置SSL参数: 不要一股脑的全上,按需使用

  • 日志审查: 定期看看 Kafka 客户端和 Broker 的日志,有没有什么异常。

  • 在application.properties文件中设置 在Spring环境中, application.properties或者application.yml才是常规设置的地方, 如果要通过sslbundle方式,可以在配置文件配置相关
    例如:

spring.kafka.consumer.bootstrap-servers=host.com:9092
spring.kafka.consumer.group-id=testtesttfour
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.security.protocol=SSL
spring.kafka.consumer.ssl.bundle=kafka_bundle

spring.ssl.bundle.pem.kafka_bundle.keystore.certificate=-----BEGIN CERTIFICATE-----....-----END CERTIFICATE-----
spring.ssl.bundle.pem.kafka_bundle.keystore.private-key=-----BEGIN PRIVATE KEY-----....-----END PRIVATE KEY-----
spring.ssl.bundle.pem.kafka_bundle.truststore.certificate=-----BEGIN CERTIFICATE-----....-----END CERTIFICATE-----

如此便不需要在代码中声明@Bean,只需要用@Autowire的方式将KafkaReceiver注入需要用到的地方.

解决这种依赖或者配置问题的核心还是仔细看文档,多做实验,根据报错信息逐步排查。祝你早日解决这个问题!