返回

深入浅出,剖析 TableEnvironment 的选择与应用

见解分享

前言

Flink SQL 系列第一篇 中,我们介绍了 Flink SQL 的发展历史、架构演进以及 Planner 的使用。本文将重点介绍 Flink SQL 中最重要的概念之一——TableEnvironment。

TableEnvironment

TableEnvironment 是 Flink SQL 的核心概念,它是 Flink SQL 执行环境的抽象,提供了一套完整的 SQL 语言支持。TableEnvironment 可以分为以下 5 种:

  1. BatchTableEnvironment :用于处理批处理数据,只支持静态表。
  2. StreamingTableEnvironment :用于处理流数据,支持静态表和动态表。
  3. BlinkTableEnvironment :用于处理 Blink 流数据,支持静态表、动态表和窗口表。
  4. HologresTableEnvironment :用于处理 Hologres 数据,支持静态表和动态表。
  5. GellyTableEnvironment :用于处理图数据,支持静态表和动态表。

TableEnvironment 的选择

在实际应用中,我们应该如何选择合适的 TableEnvironment 呢?下面是几个常用的选择原则:

  • 如果需要处理批处理数据,则使用 BatchTableEnvironment。
  • 如果需要处理流数据,则使用 StreamingTableEnvironment。
  • 如果需要处理 Blink 流数据,则使用 BlinkTableEnvironment。
  • 如果需要处理 Hologres 数据,则使用 HologresTableEnvironment。
  • 如果需要处理图数据,则使用 GellyTableEnvironment。

TableEnvironment 的使用

下面分别介绍一下这 5 种 TableEnvironment 的具体用法。

BatchTableEnvironment

BatchTableEnvironment 用于处理批处理数据,只支持静态表。我们可以通过以下方式创建 BatchTableEnvironment:

TableEnvironment tableEnv = ExecutionEnvironment.getExecutionEnvironment().createTableEnvironment();

创建好 BatchTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:

CREATE TABLE MyTable (
  id INT,
  name STRING
)

然后,我们可以使用以下 SQL 语句来插入数据:

INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')

最后,我们可以使用以下 SQL 语句来查询数据:

SELECT * FROM MyTable

StreamingTableEnvironment

StreamingTableEnvironment 用于处理流数据,支持静态表和动态表。我们可以通过以下方式创建 StreamingTableEnvironment:

TableEnvironment tableEnv = StreamExecutionEnvironment.getExecutionEnvironment().createTableEnvironment();

创建好 StreamingTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:

CREATE TABLE MyTable (
  id INT,
  name STRING
) WITH (
  'update-mode' = 'append'
)

然后,我们可以使用以下 SQL 语句来插入数据:

INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')

最后,我们可以使用以下 SQL 语句来查询数据:

SELECT * FROM MyTable

BlinkTableEnvironment

BlinkTableEnvironment 用于处理 Blink 流数据,支持静态表、动态表和窗口表。我们可以通过以下方式创建 BlinkTableEnvironment:

TableEnvironment tableEnv = BlinkStreamTableEnvironment.create(streamExecutionEnvironment);

创建好 BlinkTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:

CREATE TABLE MyTable (
  id INT,
  name STRING,
  event_time TIMESTAMP
) WITH (
  'update-mode' = 'append'
)

然后,我们可以使用以下 SQL 语句来插入数据:

INSERT INTO MyTable VALUES (1, 'Alice', '2023-03-08 12:00:00'), (2, 'Bob', '2023-03-08 13:00:00')

最后,我们可以使用以下 SQL 语句来查询数据:

SELECT * FROM MyTable

HologresTableEnvironment

HologresTableEnvironment 用于处理 Hologres 数据,支持静态表和动态表。我们可以通过以下方式创建 HologresTableEnvironment:

TableEnvironment tableEnv = HologresTableEnvironment.create(hologresOptions);

创建好 HologresTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:

CREATE TABLE MyTable (
  id INT,
  name STRING
)

然后,我们可以使用以下 SQL 语句来插入数据:

INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')

最后,我们可以使用以下 SQL 语句来查询数据:

SELECT * FROM MyTable

GellyTableEnvironment

GellyTableEnvironment 用于处理图数据,支持静态表和动态表。我们可以通过以下方式创建 GellyTableEnvironment:

TableEnvironment tableEnv = GellyTableEnvironment.create(streamExecutionEnvironment);

创建好 GellyTableEnvironment 后,就可以使用 SQL 来操作数据了。例如,我们可以使用以下 SQL 语句来创建一个表:

CREATE TABLE MyTable (
  id INT,
  name STRING
) WITH (
  'storage.type' = 'rocksdb'
)

然后,我们可以使用以下 SQL 语句来插入数据:

INSERT INTO MyTable VALUES (1, 'Alice'), (2, 'Bob')

最后,我们可以使用以下 SQL 语句来查询数据:

SELECT * FROM MyTable

Flink 社区对 TableEnvironment 的规划

Flink 社区正在积极推进 TableEnvironment 的发展,未来规划包括以下几个方面:

  • 统一 TableEnvironment 的 API。 目前,TableEnvironment 的 API 在不同的版本和模块之间存在一些差异,社区计划在未来的版本中统一这些差异,使 TableEnvironment 的 API 更加简洁、统一。
  • 增强 TableEnvironment 的功能。 社区计划在未来的版本中增强 TableEnvironment 的功能,包括支持更多的 SQL 语法、支持更多的数据源和支持更多的计算引擎。
  • 优化 TableEnvironment 的性能。 社区计划在未来的版本中优化 TableEnvironment 的性能,包括减少 TableEnvironment 的内存消耗、提高 TableEnvironment 的查询速度和提高 TableEnvironment 的并发处理能力。

结语

TableEnvironment 是 Flink SQL 的核心概念,是 Flink SQL 执行环境的抽象,提供了一套完整的 SQL 语言支持。TableEnvironment 可以分为 BatchTableEnvironment、StreamingTableEnvironment、BlinkTableEnvironment、HologresTableEnvironment 和 GellyTableEnvironment。不同的 TableEnvironment 适用于不同的场景。Flink 社区正在积极推进 TableEnvironment 的发展,未来规划包括统一 TableEnvironment 的 API、增强 TableEnvironment 的功能和优化 TableEnvironment 的性能。