Flink + Hudi:DFS Catalog 下 JDBC 连接失效分析与解决
2025-01-26 04:02:26
Flink + Hudi 使用 DFS Catalog 时 JDBC Connector 失效问题分析
在 Flink 结合 Hudi 的使用场景中,数据通常需要通过 Catalog 进行管理,便于维护和复用。但是,在使用 DFS (Distributed File System) Catalog 的时候,我们有时会遇到一个棘手的问题: JDBC Connector 无法正常读取数据。本文针对此问题展开探讨。
问题现象
当使用 Flink 的 JDBC connector 直接连接 MySQL 表(例如 stu4
)时,数据读取正常。一旦启用 DFS Catalog (例如 hudi_catalog
) 并通过它访问相同的 JDBC 表,查询会返回空结果。具体来说,尽管查询没有报错,数据却无法从数据源中提取出来。
问题原因
该问题的核心原因在于 DFS Catalog 与 JDBC Connector 的集成方式。当使用 DFS Catalog 管理 Hudi 表时,Flink 并不会像使用 JDBC connector 那样直接去访问数据库。它期待读取的是 Catalog 中维护的 Hudi 表信息和对应的数据文件。当直接使用CREATE TABLE
创建的外部表时,没有同步到 Catalog 中,因此读取不到数据。简而言之,DFS Catalog 管理的是文件系统中的 Hudi 表,而 JDBC connector 直接连接的是外部数据库表,两者不在同一个数据管理范畴里。
解决方案
以下方案提供了修复此类问题的途径:
方案一:将JDBC Connector 读取的结果写入 Hudi 表
使用 JDBC Connector 读取数据后,先将数据写入到 Hudi 表中。通过这种方式,可以将数据引入到 DFS Catalog 管理的范畴,从而在 Catalog 的支持下读取数据。
- 创建Hudi表:
CREATE TABLE hudi_stu4 (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
school STRING
)
WITH (
'connector' = 'hudi',
'path' = '/tmp/hudi_table',
'table.type' = 'COPY_ON_WRITE',
'write.tasks' = '1'
);
这一步是在 Catalog 管理下创建一个 Hudi 表。表结构需要与 MySQL 表保持一致。路径 '/tmp/hudi_table'
可以调整。'table.type' = 'COPY_ON_WRITE'
设置为 copy on write 的表类型,这里可以使用其他模式,如 mor等,看具体的场景使用。
-
通过 INSERT INTO ... SELECT ... 将JDBC数据写入 Hudi 表:
INSERT INTO hudi_stu4 SELECT id, name, school from stu4;
这一步从JDBC数据源读取数据,并将数据写入刚创建的Hudi表。此时Hudi表中就有了JDBC表的数据了,后续就可以通过DFS catalog来访问该表。
-
查询 Hudi 表:
select * from hudi_stu4;
通过查询可以看到从jdbc表导入到hudi表中的数据。
解释: 此方案的关键在于,先通过 JDBC Connector 将 MySQL 的数据读出来,并写入由 DFS Catalog 管理的 Hudi 表,然后再通过 Catalog 进行读取,从而使得数据链路贯通。
方案二:通过 Hudi 同步工具,进行数据库数据同步
可以使用 Hudi 的外部数据同步工具(如 Flink SQL 提供的 Change Data Capture 功能),来定时或增量地同步数据库数据到 Hudi 表中。
- 配置 MySQL CDC 源 (这里以debezium connector为例):
CREATE TABLE mysql_cdc_source (
`id` BIGINT,
`name` STRING,
`school` STRING,
PRIMARY KEY (`id`) NOT ENFORCED,
`ts` TIMESTAMP(3) METADATA FROM 'value.source.ts_ms' VIRTUAL,
`op` STRING METADATA FROM 'value.source.op' VIRTUAL
) WITH (
'connector' = 'debezium',
'debezium.offset.storage'='filesystem',
'debezium.offset.storage.path'='/tmp/debezium_offsets',
'debezium.database.hostname' = 'hadoop2',
'debezium.database.port' = '3306',
'debezium.database.user' = 'root',
'debezium.database.password' = '**** ***',
'debezium.database.include.list' = 'test',
'debezium.table.include.list' = 'test.stu4'
);
这个配置定义了一个从 MySQL 读取 CDC 数据的 Flink source table,这里的关键配置是配置 mysql的连接参数。 同样的配置需要将 test
和stu4
更换成你自己的database和表名。 注意该配置和上述创建表的操作,可以在同一个flink会话中操作。
- 将 CDC 流写入 Hudi 表:
insert into hudi_stu4 select `id`,`name`,`school` from mysql_cdc_source;
这里使用了方案1中创建的hudi_stu4,可以通过 insert into
sql 将cdc源表的数据写入hudi。由于采用了COPY_ON_WRITE
,对于新增数据会有数据的合并写,这也就将MySQL中的更新和删除同步到了Hudi。后续的增删改的操作也会通过 CDC 的流的方式进入到 Hudi 中。
- 查询Hudi 表:
select * from hudi_stu4;
查询步骤与方案一一致。
解释: 此方案利用 CDC 工具捕获数据库变更,然后将这些变更同步到 Hudi 表, 实现了增量数据同步,保证了 Hudi 表与数据库表的实时同步,相比方案一更加动态化。需要确保对应的Debezium 版本能够与使用的 flink 版本匹配,确保Debezium source配置正确,能够连通MySQL。
总结
使用 DFS Catalog 和 JDBC connector 时,遇到的问题是由 Catalog 的设计目的和 JDBC connector 的作用不一致导致的。为了正确处理这种情况,可以选择将数据写入 Hudi 表,然后通过 DFS Catalog 来管理读取。也可以采用 CDC 数据同步方式。建议在使用之前仔细理解每种方案的优劣,并根据具体情况选取合适的策略。
选择第一种方案比较快速,适合首次数据同步,方便数据快速接入;第二种方案更为复杂,但更加灵活,可以支持数据的增量同步,保障数据的时效性,满足动态变化的需求。