Apicurio Group ID 为 null?Kafka Avro 序列化错误解决方法
2025-05-05 17:38:39
解决 Apicurio Schema Registry 序列化时 Group ID 为 null 的问题
问题来了:序列化时找不到 Schema Group
搞 Kafka 生产者的朋友,如果用到了 Apicurio Schema Registry 做 Schema 管理,特别是用 Avro 序列化的时候,可能踩过这样一个坑:明明在代码里配好了 Schema 的 Group ID,比如指定了是 default
这个组,结果运行时愣是报错说找不到!
看下这位朋友遇到的情况,他在初始化 Serde 对象时,用 SerdeConfig
设置了一堆属性,想指定 Group ID 为 default
:
// 假设 properties 是一个 Properties 对象
properties.put(SerdeConfig.FALLBACK_ARTIFACT_PROVIDER, "io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider");
properties.put(SerdeConfig.FALLBACK_ARTIFACT_ID, "test-samuel-value");
properties.put(SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID, "default"); // 想用这个指定回退的 group
properties.put(SerdeConfig.EXPLICIT_ARTIFACT_ID, "test-samuel-value");
properties.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "default"); // 想用这个指定明确的 group
理想很丰满,现实却甩过来一个异常:
No artifact with ID 'test-samuel-value' in group 'null' was found.
纳尼?Group ID 咋成 null
了?明明指定的是 default
啊!这序列化器咋就不听话呢?
开发环境是 Java 17,POM 文件里引入了相关的 Apicurio 依赖:
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>2.6.8.Final</version>
</dependency>
<!-- 这个依赖 apicurio-registry-app 通常是跑 Registry 服务端用的,客户端其实不太需要 -->
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-app</artifactId>
<version>2.6.8.Final</version>
</dependency>
问题清楚了:Apicurio Avro 序列化器在查找 Schema 时,使用的 Group ID 是 null
,而不是我们期望配置的 default
。
为啥 Group ID 会是 'null'?
这个问题的根源,通常不在于 SerdeConfig
里那几个 FALLBACK_
或 EXPLICIT_
配置项本身有错,而在于 Kafka Producer 配置序列化器的方式 。
咱们回顾一下 Kafka Producer 是怎么用序列化器的。通常,你在创建 KafkaProducer
的时候,会提供一个 Properties
对象,里面指定了 key.serializer
和 value.serializer
的类名。比如,对于 Apicurio Avro 序列化器,你会这样设置:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-brokers:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 假设 Key 是 String
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.apicurio.registry.serde.avro.AvroKafkaSerializer"); // 指定 Apicurio Avro 序列化器
// ... 其他 Kafka Producer 配置
当 Kafka Producer 看到 value.serializer
是 AvroKafkaSerializer
时,它会实例化这个类,并调用其 configure(Map<String, ?> configs, boolean isKey)
方法。重点来了:Producer 会把 它自己 的配置(也就是你传给 KafkaProducer
构造函数的那个 Properties
对象)以及一些额外信息,传给 configure
方法。
AvroKafkaSerializer
需要从这些配置里知道 Schema Registry 的地址、要查找的 Artifact ID、以及 Group ID 。
而你代码里展示的 SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID
和 SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID
是什么呢?它们是 apicurio-registry-serdes-common
包里定义的一些配置键常量,主要用于:
- 内部配置 :当 Apicurio Serde 库内部需要更精细控制查找逻辑时使用,比如配置回退查找策略(当主策略找不到时)或者明确指定某个 Artifact(绕过自动查找)。
- 非标准 Kafka 配置场景 :比如在 Kafka Streams 应用中,有时你会手动创建和配置 Serde 实例,这时候可能会直接用这些常量来构建配置 Map。
但在标准的 Kafka Producer/Consumer 场景下,序列化器主要是通过读取传递给 configure
方法的 Map
中的特定配置键来获取信息的。 如果你只在一个独立的 Properties
对象里设置了 SerdeConfig
的那些常量,但没有把这些配置(或者说,使用 Apicurio Serde 专属 的配置键)放进传给 KafkaProducer
的主配置 Properties
里,那 AvroKafkaSerializer
在 configure
时就读不到你想要的 Group ID。它读不到,默认行为可能就是用 null
去尝试查找,自然就出错了。
简单说:你配的地方,跟序列化器实际读配置的地方,可能不是一回事儿。 它期望在 Kafka Producer 的整体配置里找到 Group ID,而不是在一个单独的、可能并未直接关联到 Producer 序列化器配置上下文的 SerdeConfig
实例里找。
怎么解决?指定 Group ID 的正确姿势
既然知道了问题所在,解决起来就顺理成章了。核心思想是:把 Apicurio Serde 需要的配置,直接放进 Kafka Producer 的配置 Properties
里去。
方案一:在 Kafka Producer 配置中直接指定 Group ID (推荐)
这是最常用、也最符合 Kafka 配置习惯的方式。Apicurio Serde 定义了一套专门用于 Kafka Producer/Consumer 配置的键。
原理与作用:
利用 KafkaProducer
初始化时传递配置给序列化器的机制。通过在 Producer 的 Properties
中设置 Apicurio Serde 认识的特定配置键(比如 apicurio.registry.artifact.group-id
),AvroKafkaSerializer
在 configure
方法被调用时就能直接读取到这些值,从而知道去哪个 Group ID 下查找 Schema。
操作步骤与代码示例:
你需要将 Group ID 配置添加到你的 KafkaProducer
的 Properties
对象中。Apicurio Serde 推荐使用 io.apicurio.registry.serde.SerdeConfig
或其父类 io.apicurio.registry.serde.config.AbstractKafkaSerDe
中定义的常量作为配置键,这样更安全(避免拼写错误)。
import io.apicurio.registry.serde.SerdeConfig; // SerdeConfig 包含很多配置键常量
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
// ...
public class MyKafkaProducer {
public void sendData() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-brokers:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); // 使用类名更安全
// ----- Apicurio Schema Registry 相关配置 -----
// 1. Registry URL (必需)
props.put(SerdeConfig.REGISTRY_URL, "http://your-apicurio-registry:8080/apis/registry/v2");
// 2. 指定 Artifact ID (如果你想让 Serde 主动查找并使用这个 ID 对应的最新 Schema)
// 这里需要 Artifact ID, 而不是 Schema Name
// 注意: Artifact ID 需要和你 Apicurio Registry 中创建的一致
props.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, "io.apicurio.registry.resolver.strategy.FindLatestArtifactStrategy");
props.put(SerdeConfig.ARTIFACT_ID, "test-samuel-value"); // 这里填你的 Artifact ID
// 3. !! 指定 Group ID !! (这才是解决问题的关键)
// 使用 SerdeConfig.GROUP_ID 常量对应的字符串值 "apicurio.registry.group-id"
// 或者直接用常量本身也可以: props.put(SerdeConfig.GROUP_ID, "default");
props.put("apicurio.registry.group-id", "default"); // 把 'default' 替换成你实际的 Group ID
// 4. (可选) 配置 Schema 解析策略等其他 Apicurio Serde 选项
// 比如,如果你的 Schema 变更不频繁,且想严格控制版本,
// 可以用 Global ID 查找,这时 Group ID 可能就不那么重要了,但配置上总没错。
// props.put(SerdeConfig.FIND_LATEST_ARTIFACT, Boolean.TRUE); // 配合 FindLatestArtifactStrategy
// props.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); // 是否自动注册 Schema (生产环境慎用)
// 移除之前在独立 map 中做的配置尝试
// 不再需要这些 fallback 和 explicit 配置, 因为上面的配置更直接
// props.remove(SerdeConfig.FALLBACK_ARTIFACT_GROUP_ID);
// props.remove(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID);
// 创建 Kafka Producer
KafkaProducer<String, YourAvroObject> producer = new KafkaProducer<>(props);
// 发送消息 (YourAvroObject 是你的 Avro 生成类)
YourAvroObject data = new YourAvroObject(/* ... 设置数据 ... */);
ProducerRecord<String, YourAvroObject> record = new ProducerRecord<>("your-topic", "some-key", data);
try {
producer.send(record);
System.out.println("消息发送成功!");
} catch (Exception e) {
System.err.println("发送消息失败:" + e.getMessage());
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}
}
关键点 :
props.put("apicurio.registry.group-id", "default");
这行是核心。确保使用的 Key 是apicurio.registry.group-id
(或对应的SerdeConfig.GROUP_ID
常量值),Value 是你期望的 Group ID 字符串。- 同时,你需要配置
apicurio.registry.url
指向你的 Apicurio Registry 实例。 - 通常还需要配置
apicurio.registry.artifact.resolver.strategy
和相关的 Artifact ID (apicurio.registry.artifact-id
) 或 Global ID (apicurio.registry.global-id
),告诉 Serde 如何找到具体的 Schema。这里例子用了FindLatestArtifactStrategy
,它会根据 Artifact ID 和 Group ID 查找最新的版本。
安全建议:
- 避免在代码中硬编码 Registry URL、敏感的 Group ID 或 Artifact ID。最好通过外部配置文件(如
.properties
,.yaml
)或环境变量加载。 - 如果你的 Registry 需要认证,还需要配置相关的认证参数(如
apicurio.registry.auth.username
,apicurio.registry.auth.password
或 token 等)。查阅 Apicurio 文档获取具体配置项。
进阶使用技巧:
- 理解 Artifact 解析策略 (
ArtifactResolverStrategy
): Apicurio Serde 支持多种策略来定位 Schema。io.apicurio.registry.resolver.strategy.FindLatestArtifactStrategy
: 根据 Artifact ID 和 Group ID 找最新版本。这种策略下 Group ID 很重要。io.apicurio.registry.resolver.strategy.FindByGlobalIdStrategy
: 根据消息中嵌入的 Global ID 直接查找。如果生产者总是能拿到正确的 Global ID(比如从 Registry API 获取后硬编码或配置),这种方式可以精确匹配 Schema 版本,对 Group ID 的依赖性降低,但配置 Group ID 仍是个好习惯,因为 Registry 内部组织还是会用到 Group。io.apicurio.registry.resolver.strategy.FindBySchemaStrategy
: 尝试在 Registry 中根据 Schema 的内容(或其 hash)查找已存在的 Artifact。配置apicurio.registry.auto-register-artifact
为 true 时可能会自动注册。Group ID 在自动注册时会被用到。- 还有其他策略如
FindByContentIdStrategy
,FindByVersionStrategy
等。选择哪种策略取决于你的 Schema 管理和版本控制流程。配置项是apicurio.registry.artifact.resolver.strategy
。
- 使用
apicurio-registry-maven-plugin
: 在构建时自动注册 Schema 并获取 Global ID 或 Content ID,然后将这些 ID 配置到你的应用中,可以配合FindByGlobalIdStrategy
或FindByContentIdStrategy
使用,实现更精确的版本控制。
方案二:重新审视你的 SerdeConfig
使用场景
如前所述,SerdeConfig
的那些常量主要用于 Serde 内部或特定场景。如果你确实是在一个需要手动创建和配置 Serde 实例的场景(比如一个不直接依赖 Kafka Producer 配置框架的自定义处理器),那么你之前用 SerdeConfig
的方式可能是对的,但你需要确保:
- 创建了一个
Map<String, Object>
类型的配置Map
。 - 将所有需要的配置(包括 Registry URL, Group ID, Artifact ID, 解析策略等)都放进这个
Map
中。 - 使用这个
Map
来调用 Serde 实例的configure
方法。
示例 (用于手动配置场景,非解决 Kafka Producer 问题):
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaSerializer;
import java.util.HashMap;
import java.util.Map;
// ...
// 假设你有一个 AvroKafkaSerializer 实例需要手动配置
AvroKafkaSerializer<YourAvroObject> serializer = new AvroKafkaSerializer<>();
// 创建配置 Map
Map<String, Object> config = new HashMap<>();
config.put(SerdeConfig.REGISTRY_URL, "http://your-apicurio-registry:8080/apis/registry/v2");
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, "io.apicurio.registry.resolver.strategy.FindLatestArtifactStrategy");
config.put(SerdeConfig.ARTIFACT_ID, "test-samuel-value");
// !! 在这里直接使用 GROUP_ID 常量指定 Group ID !!
config.put(SerdeConfig.GROUP_ID, "default"); // 注意: Key 是 SerdeConfig.GROUP_ID
// 或其字符串值 "apicurio.registry.group-id"
// 调用 configure 方法
// 注意: isKey 参数根据你的序列化目标是 key 还是 value 来定
serializer.configure(config, false); // false 表示用于序列化 value
// 现在这个 serializer 实例就配置好了,可以用它来序列化对象
// byte[] serializedData = serializer.serialize("your-topic", yourAvroObjectInstance);
// 不要忘记关闭 serializer
// serializer.close(); // Serde 实现了 Closeable
但请记住,对于标准的 Kafka Producer 集成,方案一(直接在 Producer 配置里加)是正道。
小结一下关键点
- 最常见的原因是 Group ID 没有配置在 Kafka Producer 传递给序列化器的配置
Properties
中。 - 解决方法是将
apicurio.registry.group-id
(或者用SerdeConfig.GROUP_ID
常量) 连同 Registry URL 和 Artifact 定位策略等,一起添加到 Kafka Producer 的配置里。 SerdeConfig
里的FALLBACK_*
和EXPLICIT_*
常量用于特定的回退或显式查找场景,不是配置主 Group ID 的标准方式。
按这种方式调整你的 Kafka Producer 配置,那个烦人的 group 'null'
错误应该就能顺利解决了。去试试吧!