深入浅出,剖析 TableEnvironment 的选择与应用
2023-10-29 00:05:55
前言
在 Flink SQL 系列第一篇 中,我们介绍了 Flink SQL 的发展历史、架构演进以及 Planner 的使用。本文将重点介绍 Flink SQL 中最重要的概念之一——TableEnvironment。
TableEnvironment
TableEnvironment 是 Flink SQL 的核心概念,它是 Flink SQL 执行环境的抽象,提供了一套完整的 SQL 语言支持。TableEnvironment 可以分为以下 5 种:
- BatchTableEnvironment :用于处理批处理数据,只支持静态表。
- StreamingTableEnvironment :用于处理流数据,支持静态表和动态表。
- BlinkTableEnvironment :用于处理 Blink 流数据,支持静态表、动态表和窗口表。
- HologresTableEnvironment :用于处理 Hologres 数据,支持静态表和动态表。
- GellyTableEnvironment :用于处理图数据,支持静态表和动态表。
TableEnvironment 的选择
在实际应用中,我们应该如何选择合适的 TableEnvironment 呢?下面是几个常用的选择原则:
- 如果需要处理批处理数据,则使用 BatchTableEnvironment。
- 如果需要处理流数据,则使用 StreamingTableEnvironment。
- 如果需要处理 Blink 流数据,则使用 BlinkTableEnvironment。
- 如果需要处理 Hologres 数据,则使用 HologresTableEnvironment。
- 如果需要处理图数据,则使用 GellyTableEnvironment。
TableEnvironment 的使用
下面分别介绍一下这 5 种 TableEnvironment 的具体用法。
BatchTableEnvironment
BatchTableEnvironment 用于处理批处理数据,只支持静态表。我们可以通过以下方式创建 BatchTableEnvironment:
TableEnvironment tableEnv = ExecutionEnvironment.getExecutionEnvironment().createTableEnvironment();
创建好 BatchTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:
CREATE TABLE MyTable (
id INT,
name STRING
)
然后,我们可以使用以下 SQL 语句来插入数据:
INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')
最后,我们可以使用以下 SQL 语句来查询数据:
SELECT * FROM MyTable
StreamingTableEnvironment
StreamingTableEnvironment 用于处理流数据,支持静态表和动态表。我们可以通过以下方式创建 StreamingTableEnvironment:
TableEnvironment tableEnv = StreamExecutionEnvironment.getExecutionEnvironment().createTableEnvironment();
创建好 StreamingTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:
CREATE TABLE MyTable (
id INT,
name STRING
) WITH (
'update-mode' = 'append'
)
然后,我们可以使用以下 SQL 语句来插入数据:
INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')
最后,我们可以使用以下 SQL 语句来查询数据:
SELECT * FROM MyTable
BlinkTableEnvironment
BlinkTableEnvironment 用于处理 Blink 流数据,支持静态表、动态表和窗口表。我们可以通过以下方式创建 BlinkTableEnvironment:
TableEnvironment tableEnv = BlinkStreamTableEnvironment.create(streamExecutionEnvironment);
创建好 BlinkTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:
CREATE TABLE MyTable (
id INT,
name STRING,
event_time TIMESTAMP
) WITH (
'update-mode' = 'append'
)
然后,我们可以使用以下 SQL 语句来插入数据:
INSERT INTO MyTable VALUES (1, 'Alice', '2023-03-08 12:00:00'), (2, 'Bob', '2023-03-08 13:00:00')
最后,我们可以使用以下 SQL 语句来查询数据:
SELECT * FROM MyTable
HologresTableEnvironment
HologresTableEnvironment 用于处理 Hologres 数据,支持静态表和动态表。我们可以通过以下方式创建 HologresTableEnvironment:
TableEnvironment tableEnv = HologresTableEnvironment.create(hologresOptions);
创建好 HologresTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:
CREATE TABLE MyTable (
id INT,
name STRING
)
然后,我们可以使用以下 SQL 语句来插入数据:
INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')
最后,我们可以使用以下 SQL 语句来查询数据:
SELECT * FROM MyTable
GellyTableEnvironment
GellyTableEnvironment 用于处理图数据,支持静态表和动态表。我们可以通过以下方式创建 GellyTableEnvironment:
TableEnvironment tableEnv = GellyTableEnvironment.create(streamExecutionEnvironment);
创建好 GellyTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:
CREATE TABLE MyTable (
id INT,
name STRING
) WITH (
'storage.type' = 'rocksdb'
)
然后,我们可以使用以下 SQL 语句来插入数据:
INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')
最后,我们可以使用以下 SQL 语句来查询数据:
SELECT * FROM MyTable
Flink 社区对 TableEnvironment 的规划
Flink 社区正在积极推进 TableEnvironment 的发展,未来规划包括以下几个方面:
- 统一 TableEnvironment 的 API。 目前,TableEnvironment 的 API 在不同的版本和模块之间存在一些差异,社区计划在未来的版本中统一这些差异,使 TableEnvironment 的 API 更加简洁、统一。
- 增强 TableEnvironment 的功能。 社区计划在未来的版本中增强 TableEnvironment 的功能,包括支持更多的 SQL 语法、支持更多的数据源和支持更多的计算引擎。
- 优化 TableEnvironment 的性能。 社区计划在未来的版本中优化 TableEnvironment 的性能,包括减少 TableEnvironment 的内存消耗、提高 TableEnvironment 的查询速度和提高 TableEnvironment 的并发处理能力。
结语
TableEnvironment 是 Flink SQL 的核心概念,是 Flink SQL 执行环境的抽象,提供了一套完整的 SQL 语言支持。TableEnvironment 可以分为 BatchTableEnvironment、StreamingTableEnvironment、BlinkTableEnvironment、HologresTableEnvironment 和 GellyTableEnvironment。不同的 TableEnvironment 适用于不同的场景。Flink 社区正在积极推进 TableEnvironment 的发展,未来规划包括统一 TableEnvironment 的 API、增强 TableEnvironment 的功能和优化 TableEnvironment 的性能。