返回

Oozie Java Action 获取 Kafka 令牌: 告别 JAAS 认证错误

java

Oozie Java Action 中获取 Kafka 委托令牌(Delegation Token)实战

在 Oozie 工作流里跑 Java Action 去操作启用了 Kerberos 的 Kafka 集群,这场景挺常见的。但怎么让 Java Action 拿到认证信息,尤其是 Oozie 通过 <credentials> 获取到的 Kafka Delegation Token,就有点绕了。直接用 System.getenv("KAFKA_DELEGATION_TOKEN") 去拿?通常是拿不到的,而且还可能遇到 Kafka 客户端报 JAAS 找不到配置的错。

问题摆在这儿

咱们有个 Oozie Workflow,里面定义了 Kafka 的凭据 (<credential type="kafka">),指定了 keytab、principal、Kafka 服务器地址等信息。

<workflow-app name="HelloWorldWorkflow" xmlns="uri:oozie:workflow:0.4">
    <credentials>
        <credential name="kafka-credentials" type="kafka">
            <property>
                <name>oozie.kafka.bootstrap.servers</name>
                <value>your-kafka-brokers:669</value> <!-- 替换成你的 Kafka 地址 -->
            </property>
            <property>
                <name>oozie.kafka.sasl.kerberos.service.name</name>
                <value>kafka</value>
            </property>
            <property>
                <name>oozie.kafka.security.protocol</name>
                <value>SASL_PLAINTEXT</value> <!-- 或 SASL_SSL,取决于你的 Kafka 配置 -->
            </property>
            <property>
                <name>oozie.kafka.sasl.mechanism</name>
                <value>GSSAPI</value>
            </property>
            <!-- Oozie 使用这些信息去获取 Delegation Token -->
            <property>
                <name>oozie.authentication.kerberos.keytab</name>
                <value>/path/on/hdfs/to/your/svc.keytab</value> <!-- HDFS 上的 Keytab 路径 -->
            </property>
            <property>
                <name>oozie.authentication.kerberos.principal</name>
                <value>[email protected]</value> <!-- 你的 Principal -->
            </property>
        </credential>
    </credentials>
    <start to="java-action"/>

    <action name="java-action" cred="kafka-credentials"> <!-- 引用上面定义的凭据 -->
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <main-class>com.yourcompany.HelloWorld</main-class> <!-- 你的 Java 主类 -->
            <file>hdfs:///user/sample/lib/HelloWorld.jar#HelloWorld.jar</file> <!-- 主 Jar 包 -->
            <!-- 可能还需要包含 Kafka 客户端及其依赖的 Jar 包 -->
            <!-- <archive>hdfs:///path/to/your/kafka-client-libs.zip#lib</archive> -->
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
        <message>Workflow failed: ${wf:errorMessage(wf:lastErrorNode())}</message>
    </kill>
    <end name="end"/>
</workflow-app>

然后在 Java Action (HelloWorld.java) 里,想当然地尝试用环境变量去取 Token:

String kafkaDelegationToken = System.getenv("KAFKA_DELEGATION_TOKEN");
// 结果发现 kafkaDelegationToken 是 null 或者空字符串

更糟糕的是,运行 Kafka 客户端代码(比如 KafkaProducer)时,程序直接挂了,抛出类似下面的异常:

Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
    at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
    at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
    at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:82)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:167)
    // ... (堆栈省略)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:428)

错误信息很明确:找不到名叫 KafkaClient 的 JAAS 配置项,而且系统属性 java.security.auth.login.config 也没设置。

这到底是咋回事?Oozie 不是应该帮我们搞定认证了吗?Token 去哪儿了?

刨根问底:为什么会出问题?

这事儿的关键在于理解 Oozie 的凭据机制和 Hadoop/YARN 的安全上下文是如何工作的。

  1. Oozie 的角色: 当你在 Workflow 中定义了 <credentials> 并指定 type="kafka",Oozie Action Chainer(或者叫 Launcher)在启动你的 Java Action 之前,会扮演一个中间人的角色。它会使用你提供的 Principal 和 Keytab,通过 Kerberos 认证,去向 Kafka 集群请求一个 Delegation Token。
  2. Token 的去向: Oozie 拿到这个 Delegation Token 后,并不会简单粗暴地把它塞进一个叫 KAFKA_DELEGATION_TOKEN 的环境变量里。它会将这个 Token (以及可能的其他 Token,比如 HDFS 的) 添加到 Hadoop 的 UserGroupInformation (UGI) 对象的凭据(Credentials)集合里。这个 UGI 代表了运行这个 Oozie Action 的用户身份。
  3. YARN 的环境: Oozie Java Action 通常是作为一个 YARN Application (MapReduce 作业的一种特殊形式,或者直接是 YARN Container) 来运行的。YARN 在启动这个应用的 Container 时,会把包含 Delegation Token 的 UGI 信息序列化到一个文件里,并通过环境变量 HADOOP_TOKEN_FILE_LOCATION 把这个文件的路径告诉 Container 里运行的进程(也就是你的 Java Action)。
  4. Kafka Client 的行为: Kafka 客户端库(kafka-clients.jar)在配置了 SASL 安全协议(比如 SASL_PLAINTEXTSASL_SSL)和 GSSAPI 机制时,如果它能感知到自己跑在 Hadoop/YARN 环境下,它会自动尝试HADOOP_TOKEN_FILE_LOCATION 指向的文件加载凭据。如果找到了对应的 Kafka Delegation Token,它就会用这个 Token 去跟 Kafka Broker 认证,整个过程对你的 Java 代码来说几乎是透明的。
  5. JAAS 报错的原因: 当 Kafka Client 因为某些原因(比如配置不对、环境问题、库版本不兼容等)没能成功从 UGI / Token 文件里找到并使用 Delegation Token 时,它可能会回退到尝试使用标准的 JAAS (Java Authentication and Authorization Service) 配置进行 Kerberos 认证。这时,它会查找名为 KafkaClient 的 JAAS 配置项。因为你(很可能)没有在 Java Action 的启动参数里通过 -Djava.security.auth.login.config=/path/to/jaas.conf 指定一个 JAAS 配置文件,也没有对应的 KafkaClient 配置段,所以它就抱怨找不到配置,抛出那个 IllegalArgumentException

总结一下:问题不在于 Token 不存在,而在于你的 Java 代码(或者说 Kafka Client)没能自动、正确地利用 YARN 环境提供的包含 Token 的 UGI/凭据文件。直接读环境变量肯定不行,而 JAAS 报错是因为自动发现 Token 的机制失败了,程序走了另一条需要显式 JAAS 配置的路。

解决之道:让 Kafka Client 自动捡起 Token

既然 Token 在 UGI / 凭据文件里,我们要做的就是确保 Kafka Client 能找到并使用它。核心思想是:配置好 Kafka Client,让它利用 Hadoop 的安全机制,而不是去操心手动管理 Token 或 JAAS 文件。

方案一:正确配置 Kafka 客户端属性

这是最推荐、也是最符合 Oozie 设计意图的方式。

  1. 原理: 通过设置正确的 Kafka Producer/Consumer 属性,特别是 security.protocolsasl.mechanism,引导 kafka-clients 库去使用环境中的 Kerberos/Delegation Token 信息。当设置为 GSSAPI 时,客户端通常会优先尝试使用 UGI 中的凭据。

  2. 操作步骤和代码示例:

    在你的 HelloWorld.java 代码里,初始化 Kafka Producer 或 Consumer 时,设置以下关键属性:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class HelloWorld {
    
        public static void main(String[] args) {
            // 从 Oozie 配置或环境变量中获取 Kafka 服务器地址
            // Oozie 会将 workflow <configuration> 中的属性传递给 Java Action
            // 也可以通过 System.getProperty("oozie.kafka.bootstrap.servers") 获取,
            // 如果在 workflow definition 的 <configuration> 里定义了的话。
            // 这里假设我们能获取到 bootstrap servers 地址
            String bootstrapServers = args.length > 0 ? args[0] : "your-kafka-brokers:669"; // 最好动态传入
    
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // === 关键的安全配置 ===
            // 1. 设置安全协议,这个要和你 Oozie credential 里定义的一致
            //    并且要和你的 Kafka Broker 配置一致
            props.put("security.protocol", "SASL_PLAINTEXT"); // 或者 SASL_SSL
    
            // 2. 设置 SASL 机制为 GSSAPI
            //    即使我们实际用的是 Delegation Token,GSSAPI 是触发 Kerberos/Token 认证流程的常见配置。
            //    Hadoop 集成良好的 Kafka Client 库会处理好是用 TGT 还是 Token。
            props.put("sasl.mechanism", "GSSAPI");
    
            // 3. 指定 Kafka 服务名,也要和 Oozie credential 及 Kafka Broker 配置一致
            props.put("sasl.kerberos.service.name", "kafka");
    
            // !! 注意:不要手动设置 JAAS 相关的属性 !!
            // 不要设置 System.setProperty("java.security.auth.login.config", "...");
            // 不要设置 props.put("sasl.jaas.config", "...");
            // 让 Kafka Client 自动发现 YARN 环境里的 Token
    
            KafkaProducer<String, String> producer = null;
            try {
                System.out.println("准备创建 KafkaProducer,使用配置: " + props);
                producer = new KafkaProducer<>(props);
                System.out.println("KafkaProducer 创建成功!");
    
                // 发送一条测试消息
                producer.send(new ProducerRecord<>("your-topic-name", "key", "Hello from Oozie Java Action with Delegation Token!"));
                System.out.println("消息发送成功!");
    
            } catch (Exception e) {
                System.err.println("创建 KafkaProducer 或发送消息时出错:");
                e.printStackTrace(System.err);
                // 在 Oozie 中,可以抛出异常让 Action 失败
                throw new RuntimeException("Kafka operation failed", e);
            } finally {
                if (producer != null) {
                    producer.close();
                    System.out.println("KafkaProducer 已关闭。");
                }
            }
        }
    }
    

    关键点解释:

    • security.protocol: 必须设置为 SASL_PLAINTEXTSASL_SSL,取决于你的 Kafka 集群配置。
    • sasl.mechanism: 设置为 GSSAPI。这是让 Kafka 客户端尝试 Kerberos 或相关机制(如 Delegation Token)认证的标准方式。客户端库内部逻辑会判断当前 UGI 环境,决定是用 TGT 还是 Token。
    • sasl.kerberos.service.name: Kafka 服务 Principal 的 Service 部分,通常是 kafka
    • 绝对不要 在代码里或者通过系统属性 -Djava.security.auth.login.config 来设置 JAAS 配置。这样做的目的是强迫 Kafka Client 放弃寻找 JAAS 文件,转而依赖它发现 Hadoop/YARN 安全环境的能力。
  3. 安全建议:

    • Keytab 文件应放置在 HDFS 上受保护的目录,只有 Oozie 服务用户和运行工作流的用户(或其代理用户)有权访问。
    • Oozie Workflow XML 中定义的 Principal 应严格限制权限,仅授予其完成任务所需的 Kafka Topic 权限。
  4. 进阶使用技巧:

    • 调试 UGI: 如果仍然遇到问题,可以在 Java Action 代码里尝试打印 UGI 相关信息来调试:
      import org.apache.hadoop.security.UserGroupInformation;
      // ...
      try {
          UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
          System.out.println("Current UGI: " + ugi);
          System.out.println("UGI has Kerberos credentials: " + ugi.hasKerberosCredentials());
          // Credentials 类在 Hadoop 内部,但可以看看 Tokens
          // System.out.println("UGI Credentials/Tokens: " + ugi.getCredentials()); // 可能包含敏感信息,谨慎打印
          // 检查 Token 文件位置
          String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
          System.out.println("HADOOP_TOKEN_FILE_LOCATION: " + tokenFile);
          // 如果文件存在,理论上 Kafka Client 应该能读取它
      } catch (IOException e) {
          System.err.println("Error getting UGI info: " + e.getMessage());
      }
      
    • 确认 Token 获取: Oozie Job Log 里应该能看到 Oozie Launcher 成功获取 Kafka Delegation Token 的日志。如果 Oozie 获取 Token 就失败了,Java Action 自然也用不了。

方案二:检查环境和依赖(辅助排查)

如果方案一配置没问题但依然报错,那可能是环境因素。

  1. 原理: Oozie 的 Credential 功能和 Java Action 的运行依赖于正确的环境配置和 Jar 包。
  2. 排查步骤:
    • Oozie Server 端配置: 确认 Oozie Server 的 oozie-site.xml 里配置了正确的 Kafka Credential Class (org.apache.oozie.action.hadoop.KafkaCredentials 或类似)。同时,Oozie Server 的 sharelib (通常是 HDFS 上的 /user/oozie/share/lib/lib_<timestamp>/kafka) 需要包含与你的 Kafka 集群版本兼容的 kafka-clients Jar 包,这样 Oozie 才能代表用户去获取 Token。
    • Java Action 的 Jar 包: 你的 java-action 需要能访问到兼容版本kafka-clients Jar 及其所有依赖(比如 slf4j, lz4, snappy, zstd 等)。有两种常见做法:
      • 将所有需要的 Jar 打包进一个 Fat Jar,然后通过 <file> 上传。
      • kafka-clients 及其依赖的 Jar 单独放在 HDFS 目录或一个 zip/tar.gz 压缩包里,通过 <archive> 标签在 Oozie Workflow 中指定,并在 Java Action 运行时将其添加到 Classpath。例如:
        <action name="java-action" cred="kafka-credentials">
            <java>
                ...
                <main-class>com.yourcompany.HelloWorld</main-class>
                <file>hdfs:///user/sample/lib/HelloWorld.jar#HelloWorld.jar</file>
                <archive>hdfs:///path/to/kafka/libs.zip#lib</archive> <!-- 解压到 lib 目录 -->
                <arg>${kafkaBrokerList}</arg> <!-- 传入 Kafka 服务器地址 -->
                <!-- 可能需要设置 CLASSPATH 环境变量,或者让 Java Action 的启动脚本处理 -->
            </java>
            ...
        </action>
        
      确保 Java Action 使用的 kafka-clients 版本和你 Kafka Broker 的版本是兼容的。版本不匹配是各种诡异问题的常见根源。
    • YARN/MapReduce 配置: 确认 YARN (NodeManager) 和 MapReduce (MRAppMaster) 的 Classpath 设置能正确加载 Hadoop 安全相关的类。通常这个由 Hadoop 管理员配置好,但以防万一。
    • Kerberos 配置: 确认 YARN Container 内部环境能访问到 Kerberos 配置文件 (krb5.conf),并且配置正确(比如 KDC 地址、realm 信息)。虽然用的是 Delegation Token,但 GSSAPI 底层交互有时仍需基础 Kerberos 环境信息。这个通常也是 YARN 会自动分发。
    • 查看详细日志: YARN Container 的日志是排查问题的金矿。通过 yarn logs -applicationId <your_app_id> 命令获取 Java Action 运行的 stdout, stderr 日志,里面通常会有比 Oozie Web UI 更详细的错误堆栈和线索。

通过上面这些方法,尤其是方案一中的正确配置 Kafka 客户端属性,基本就能解决 Oozie Java Action 中使用 Kafka Delegation Token 遇到的 JAAS 报错问题。核心就是相信并利用好 Hadoop/YARN 提供的安全上下文,让 Kafka Client 自动干活,避免不必要的手动干预。