返回
SASL_PLAINTEXT认证下Flink SQL连接Kafka的必杀技
后端
2023-06-28 02:26:18
使用 Flink SQL 连接 SASL_PLAINTEXT 认证的 Kafka
背景
在使用 Flink SQL 连接到需要 SASL_PLAINTEXT 认证的 Kafka 集群时,需要进行特殊的配置。本指南将一步一步地引导您完成连接和处理数据所需的过程。
准备工作
- 安装软件: 安装 Flink 1.18.0 和 Kafka 3.3.1。
- 下载连接器: 下载 Kafka Connect Flink 连接器。
- 配置 Flink: 在 Flink 配置文件中添加以下属性:
kafka {
properties.bootstrap.servers = "kafka1:19092,kafka2:29092,kafka3:39092"
properties.group.id = "my-group"
properties.security.protocol = "SASL_PLAINTEXT"
properties.sasl.mechanism = "PLAIN"
properties.sasl.jaas.config = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";"
}
创建 Flink SQL 表
CREATE TABLE my_table (
`user` STRING,
`item` STRING,
`rating` INT
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'kafka1:19092,kafka2:29092,kafka3:39092',
'properties.group.id' = 'my-group',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";'
);
查询数据
SELECT * FROM my_table;
启动 Flink 作业
flink run -c com.example.MyJob /path/to/my_job.jar
验证作业
flink list
数据处理
使用 Flink SQL 运算符进行数据处理:
- 过滤:
SELECT * FROM my_table WHERE rating > 3;
- 聚合:
SELECT user, AVG(rating) AS average_rating FROM my_table GROUP BY user;
- 连接:
SELECT * FROM my_table JOIN other_table ON user;
停止作业
flink cancel [job_id]
常见问题解答
- 我无法连接到 Kafka 集群,收到“未授权”错误。 确保 SASL_PLAINTEXT 凭据正确,并且 Kafka 集群已配置为使用该认证机制。
- 我的作业没有收到任何数据。 检查 Kafka 主题是否存在,并且 Flink 作业具有消费该主题的权限。
- 我无法使用 Flink SQL 进行聚合。 确保您已在 Flink 配置文件中启用了聚合(
'enable.table.module.async' : 'true'
)。 - 我正在尝试连接到一个安全的 Kafka 集群。 除了 SASL_PLAINTEXT 认证之外,您还需要配置 SSL。有关详细信息,请参阅 Flink 文档。
- 我需要帮助解决故障排除问题。 在 Flink 社区论坛 上寻求支持,或查看 Flink 文档 以获取更多信息。