返回

BigQuery 连接外部 MySQL 数据库:无需 SSH 隧道的方法

mysql

外部 MySQL 数据库连接到 BigQuery:绕过 SSH 隧道

最近遇到个事儿,要把一个放在外网的 MySQL 数据库连到 BigQuery 做分析。挺头疼的,因为还不能用 SSH 隧道。折腾了一番,总结了几个不用 SSH 隧道的法子,分享出来给大家。

一、 为啥连不上?

一般我们连数据库,建个 SSH 隧道,安全又方便。但这次不行,可能有几种原因:

  1. 网络限制: 防火墙规则或者安全组策略,直接把 SSH 连接给禁了。
  2. 权限问题: 数据库服务器压根儿就没给你 SSH 访问权限。
  3. 安全考虑: 有些公司为了安全,明令禁止通过 SSH 隧道直接访问生产数据库。
  4. 厂商数据库控制, 禁止使用ssh.

既然 SSH 隧道这条路走不通,那就只能另辟蹊径了。

二、 几个不用 SSH 隧道的办法

1. 使用 BigQuery Data Transfer Service (DTS)

BigQuery Data Transfer Service 专门用来把数据从各种地方搬到 BigQuery。它支持从 MySQL 数据库直接导入数据,不用操心底层连接。

原理:

DTS 在 Google Cloud 内部搞一个 agent,这个 agent 去连接你的 MySQL 数据库,把数据抓出来,然后灌到 BigQuery 里。

操作步骤:

  1. 准备工作:

    • 确保你的 MySQL 数据库允许来自 Google Cloud 的连接。 这可能需要在 MySQL 用户权限里配置,或者修改防火墙规则,允许特定的 Google Cloud IP 地址范围访问。
    • 在 BigQuery 里建好目标数据集(dataset)。
  2. 配置 DTS:

    • 在 Google Cloud 控制台里找到 "BigQuery Data Transfer Service"。
    • 点 "创建传输作业 (Create transfer)"。
    • 数据源选择 "MySQL"。
    • 填上你的 MySQL 数据库连接信息:
      • 主机名或 IP 地址
      • 端口号
      • 用户名
      • 密码
      • 数据库名称
    • 选择目标数据集,还有表名(DTS 可以帮你创建新表,也可以把数据追加到现有表里)。
    • 配置传输计划(比如每天一次、每周一次,或者只传一次)。
  3. 启动Transfer并观察状态.

代码示例 (使用 bq 命令行工具):
先使用如下代码进行Data transfer service 的环境准备:

bq mk --transfer_config \
--project_id=<your_project_id> \
--data_source=mysql \
--display_name='My MySQL Transfer' \
--target_dataset=<your_bigquery_dataset> \
--params='{"data_path_template":"mysql://<username>:<password>@<host>:<port>/<database>","destination_table_name_template":"<table_name>","data_type":"TABLE"}'

注意替换掉尖括号里的内容.

进阶技巧:
DTS会自动做数据的增量获取.

安全提示:

  • 给 DTS 用的 MySQL 用户,权限尽量小点儿,够读数据就行。
  • 连接信息这些敏感东西,可以用 Google Cloud Secret Manager 之类的服务存起来,更安全。
  • 数据在传输过程中会加密,保证安全.

2. 使用 Google Cloud Dataflow

Dataflow 是个流批一体的数据处理服务。我们可以写个 Dataflow 作业,从 MySQL 数据库里读数据,然后写到 BigQuery。

原理:

Dataflow 用 Apache Beam 编程模型。写个 Beam 程序,里面用 JDBC 连接到 MySQL,读数据,做一些转换(如果需要的话),再用 BigQuery 的 API 把数据写进去。

操作步骤:

  1. 准备工作: 和用 DTS 差不多,要确保网络连通性、数据库权限,以及 BigQuery 里有目标数据集。

  2. 开发 Dataflow 作业:

    • 用你喜欢的语言 (Java, Python, Go) 写一个 Apache Beam 程序。
    • 用 JDBC 连接到 MySQL:
      • 需要 MySQL 的 JDBC 驱动,加到项目依赖里。
      • JDBC 连接字符串长这样: jdbc:mysql://<host>:<port>/<database>?user=<username>&password=<password>
    • 读取数据:Beam 里有 JdbcIO 可以直接从数据库里读。
    • 转换数据 (可选):Beam 提供各种转换操作。
    • 写入 BigQuery:Beam 里有 BigQueryIO,可以写数据到 BigQuery。
  3. 部署和运行: 把写好的 Beam 程序部署到 Dataflow,然后运行起来。

代码示例 (Java, 使用 Apache Beam):

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import com.google.api.services.bigquery.model.TableRow;
import java.sql.ResultSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlToBigQuery {

  private static final Logger LOG = LoggerFactory.getLogger(MySqlToBigQuery.class);
  public interface MyOptions extends PipelineOptions {
      // 这里可以定义命令行参数,如数据库信息
  }
    public static void main(String[] args) {
       MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);
        String jdbcUrl = "jdbc:mysql://<host>:<port>/<database>?user=<username>&password=<password>";  //替换实际的信息

        p.apply("ReadFromMySQL", JdbcIO.<TableRow>read()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                        "com.mysql.cj.jdbc.Driver", jdbcUrl))  // 注意使用的Driver类名
                .withQuery("SELECT * FROM your_table")  // 你的查询语句
                .withRowMapper(new JdbcIO.RowMapper<TableRow>() {
                    public TableRow mapRow(ResultSet resultSet) throws Exception {
                        TableRow row = new TableRow();
                      //循环读取数据.
                         int columnCount = resultSet.getMetaData().getColumnCount();
                        for (int i = 1; i <= columnCount; i++) {
                           row.set(resultSet.getMetaData().getColumnName(i), resultSet.getObject(i));
                          }
                      // 打印到控制台
                         LOG.info("Row data before writing to BigQuery: " + row.toString());

                        return row;
                    }
                }))

        .apply("WriteToBigQuery", BigQueryIO.writeTableRows()
                .to("<your_project_id>:<your_bigquery_dataset>.<table_name>") // 替换
                .withSchema(getSchema()) // 如果需要,可以手动定义Schema
                .withWriteDisposition(BigQueryIO.WriteDisposition.WRITE_APPEND) //或者 WRITE_TRUNCATE, 按需选择.
                .withCreateDisposition(BigQueryIO.CreateDisposition.CREATE_IF_NEEDED));

        p.run().waitUntilFinish();
    }

  //Schema, 请替换为数据库表实际字段.
      private static TableSchema getSchema()
    {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("column1").setType("STRING"));
        fields.add(new TableFieldSchema().setName("column2").setType("INTEGER"));
         // 其他更多字段定义

        return new TableSchema().setFields(fields);

    }
}

把尖括号 <...> 里的内容替换成你自己的信息。这个例子假设你知道表结构, 使用了withSchema()做了schema设定.

安全提示:

  • 和 DTS 一样,数据库连接信息别写死在代码里,用 Secret Manager 存起来。
  • JDBC 连接最好启用 SSL/TLS 加密。
  • 可以进一步设定只读用户

3. 使用 Google Cloud Functions 或 Cloud Run + 自定义代码

如果前两种方法不满足需求, 那还可以搞个 Cloud Functions 或者 Cloud Run,在里面写代码,用程序逻辑去连数据库,取数据,然后写到 BigQuery。

原理:

Cloud Functions 和 Cloud Run 都是无服务器计算服务。我们写一段程序 (Node.js, Python, Go 之类的),程序里用 MySQL 的客户端库连接数据库,查询数据,然后用 BigQuery 的客户端库把数据写进去。

操作步骤:

  1. 准备工作: 还是网络、权限、数据集那些事儿。

  2. 开发函数或服务:

    • 选一个你熟悉的语言。
    • 用 MySQL 的客户端库连接到数据库(比如 Python 的 mysql-connector-python)。
    • 查询数据。
    • 用 BigQuery 的客户端库 (比如 Python 的 google-cloud-bigquery) 把数据写到 BigQuery。
  3. 部署和触发: 把代码部署到 Cloud Functions 或 Cloud Run。可以设置定时触发 (Cloud Scheduler),或者通过 HTTP 请求触发。

代码示例 (Python, 使用 Cloud Functions):

from google.cloud import bigquery
import mysql.connector  # pip install mysql-connector-python

def mysql_to_bigquery(event, context):

    # 数据库连接信息
    db_config = {
        'host': '<host>',
        'port': <port>,
        'user': '<username>',
        'password': '<password>',
        'database': '<database>'
    }

     # BigQuery 信息, 替换为你的实际值.
    project_id = '<your_project_id>'
    dataset_id = '<your_bigquery_dataset>'
    table_id = '<table_name>'

    # 建立数据库连接
    try:
        conn = mysql.connector.connect(**db_config)
        cursor = conn.cursor()

        # 查询数据
        cursor.execute("SELECT * FROM your_table")
        rows = cursor.fetchall()

         # BigQuery 客户端
        client = bigquery.Client()
         # 目标表
        table_ref = client.dataset(dataset_id, project=project_id).table(table_id)
          # 将每一行转换为 BigQuery 可以接受的字典格式
        bq_rows = []
        column_names = [i[0] for i in cursor.description]  # 获取列名
        for row in rows:
          bq_rows.append(dict(zip(column_names, row)))

            #写入数据
        if(bq_rows):
              errors = client.insert_rows_json(table_ref, bq_rows)  #insert_rows()已不建议使用

              if errors:
                   print(f"写入时发生错误:{errors}")
              else:
                    print("数据写入成功.")
    except Exception as e:
        print(f"出错了: {e}")
    finally:
        # 关闭数据库连接
        if conn:
            cursor.close()
            conn.close()

将此代码部署到Google Cloud Functions, 进行相关调用, 即可把数据导入到BigQuery中.

安全提示:

  • 敏感信息放环境变量里,或者通过Secret Manager管理.
  • 尽量对访问的数据列做控制.
  • 根据需求考虑增加连接池.
  • 日志做好

三、 小结

上面这几个方法,都能在不用 SSH 隧道的情况下,把外部 MySQL 数据库的数据弄到 BigQuery。具体选哪个,看你自己的需求和技术栈。
如果追求简单,数据量不是巨大,DTS 就挺好。Dataflow 更灵活, 适合处理复杂数据,Cloud Functions 和 Cloud Run则提供了极大的自由度,方便按需处理。