返回

SASL_PLAINTEXT认证下Flink SQL连接Kafka的必杀技

后端

使用 Flink SQL 连接 SASL_PLAINTEXT 认证的 Kafka

背景

在使用 Flink SQL 连接到需要 SASL_PLAINTEXT 认证的 Kafka 集群时,需要进行特殊的配置。本指南将一步一步地引导您完成连接和处理数据所需的过程。

准备工作

  1. 安装软件: 安装 Flink 1.18.0 和 Kafka 3.3.1。
  2. 下载连接器: 下载 Kafka Connect Flink 连接器。
  3. 配置 Flink: 在 Flink 配置文件中添加以下属性:
kafka {
  properties.bootstrap.servers = "kafka1:19092,kafka2:29092,kafka3:39092"
  properties.group.id = "my-group"
  properties.security.protocol = "SASL_PLAINTEXT"
  properties.sasl.mechanism = "PLAIN"
  properties.sasl.jaas.config = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";"
}

创建 Flink SQL 表

CREATE TABLE my_table (
  `user` STRING,
  `item` STRING,
  `rating` INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'my-topic',
  'properties.bootstrap.servers' = 'kafka1:19092,kafka2:29092,kafka3:39092',
  'properties.group.id' = 'my-group',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
);

查询数据

SELECT * FROM my_table;

启动 Flink 作业

flink run -c com.example.MyJob /path/to/my_job.jar

验证作业

flink list

数据处理

使用 Flink SQL 运算符进行数据处理:

  • 过滤: SELECT * FROM my_table WHERE rating > 3;
  • 聚合: SELECT user, AVG(rating) AS average_rating FROM my_table GROUP BY user;
  • 连接: SELECT * FROM my_table JOIN other_table ON user;

停止作业

flink cancel [job_id]

常见问题解答

  1. 我无法连接到 Kafka 集群,收到“未授权”错误。 确保 SASL_PLAINTEXT 凭据正确,并且 Kafka 集群已配置为使用该认证机制。
  2. 我的作业没有收到任何数据。 检查 Kafka 主题是否存在,并且 Flink 作业具有消费该主题的权限。
  3. 我无法使用 Flink SQL 进行聚合。 确保您已在 Flink 配置文件中启用了聚合('enable.table.module.async' : 'true')。
  4. 我正在尝试连接到一个安全的 Kafka 集群。 除了 SASL_PLAINTEXT 认证之外,您还需要配置 SSL。有关详细信息,请参阅 Flink 文档。
  5. 我需要帮助解决故障排除问题。Flink 社区论坛 上寻求支持,或查看 Flink 文档 以获取更多信息。