返回

Airflow SSH隧道访问MySQL踩坑实录与解决方案

mysql

Airflow 通过 SSH 隧道访问 MySQL 数据库的踩坑实录

工作中,遇到个事儿:要用 Airflow 做个 ETL 流程,从 MySQL 抽数据到 Postgres。 数据库那边要求必须通过 SSH 隧道访问,说是为了安全,咱也没法改数据库配置,只能硬着头皮上了。结果,折腾了好久才搞定。这过程里头的坑,得记录下,给大伙儿提个醒。

问题:隧道通了,连不上库

按理说,SSH 隧道建好了,数据库连接应该没问题啊。结果,愣是各种报错,就是连不上。

为什么会这样?

这问题,细究起来,原因可能有这么几个:

  1. 端口映射问题: SSH 隧道建好了,本地端口和远程数据库端口之间的映射可能没搞对。 咱本地访问 127.0.0.1:3306, 这个请求能不能正确转发到远端数据库的对应端口, 这中间哪个环节出点岔子都不行。
  2. MySQL 用户权限问题: 即使隧道通了,数据库那边也得认你这个用户啊。有可能你用来连接数据库的这个用户, 压根儿就没权限从你建立隧道的这个机器 IP 地址去连。
  3. 网络问题: 别看隧道日志显示连接成功了,网络波动、防火墙规则啥的,都有可能导致连接不稳定,甚至中断。
  4. 数据库配置: 有的 MySQL 数据库,它可能绑定了特定的 IP 地址。你要是从别的 IP (即使是隧道过来的)去访问,它就不让你连。
  5. max_allowed_packet设置问题 : JDBC 默认值小了, 容易出现PacketTooBigException

解决方案:各种尝试

问题原因可能有这么多,咱也不能干等着。 我试了好几种方法,一样样来说:

方案一:调整 SSHHookMySQLHook (最终方案)

这块问题出现的概率其实不低, SSHHookMySQLHook 都需要根据实际情况调整.

  • 原理和作用:
    SSHHook 负责建立 SSH 隧道。MySqlHook 是 Airflow 里专门用来连接 MySQL 数据库的。这俩配合,先建隧道,再通过隧道连接。

  • 代码示例:
    先看创建SSH隧道的这部分。 核心代码如下, 要保证把远端数据库的端口(这里用的默认的3306, 你根据实际情况改),映射到本地的某个端口:

    from airflow.providers.ssh.hooks.ssh import SSHHook
    from airflow.providers.mysql.hooks.mysql import MySqlHook
    import pandas as pd
    import logging
    
    #假设你在airflow的connection里面已经设置好了一个叫 'connection_ssh'的连接。
    with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook:
    
        # 获取远程MySQL服务器实际监听的端口
        remote_host = 'your_mysql_server_ip' # 换成你实际的 MySQL 服务器 IP
        remote_port = 3306  #大多数情况都是3306,如有不同请调整
    
        # 在本地挑选一个没有被占用的端口。 不要总是 3306,换一个试试。
        local_port = 3307  
    
        with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel:
    
             tunnel.start()
    
             logging.info(f"SSH Tunnel opened: Local Port {local_port}")
    
             # 使用 MySqlHook。 注意,host 和 port 参数要改!
             mysql_hook = MySqlHook(mysql_conn_id='mysqlconn')
    
             #直接用 get_sqlalchemy_engine 来避免获取连接后又关闭的错误操作。
             engine = mysql_hook.get_sqlalchemy_engine(engine_kwargs={"connect_args": {"port": local_port}})
    
             # 通过这个engine来读取数据。
             query = "select * from order where order_id=1"
             df = pd.read_sql(query, engine)
    
             print(df)
    
    
  • 额外提示 :

1.  确保 Airflow Worker 节点能正常访问 SSH 服务器。
2.  `local_port` 可以尝试使用其他的, 例如3307、13306等。
  • 进阶用法:

    如果你对性能有要求,可以在get_sqlalchemy_engine里配置连接池参数,比如:

    engine = mysql_hook.get_sqlalchemy_engine(engine_kwargs={"connect_args": {"port": local_port}, "pool_size": 10, "max_overflow": 20})
    

    这样能减少频繁创建和关闭连接带来的开销。

方案二: 使用pymysql (部分情况有效)

  • 原理: pymysql 是 Python 里一个常用的 MySQL 客户端库。 它比 MySQLHook 更底层,可以更灵活地控制连接参数。

  • 代码示例:
    先看创建SSH隧道的这部分。
    这里local_bind_port就是本地映射的端口了:

      from airflow.providers.ssh.hooks.ssh import SSHHook
      import pymysql
      import pandas as pd
    
     with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook:
    
           # 和上面一样,先确定远程和本地端口
           remote_host = 'your_mysql_server_ip'
           remote_port = 3306
           local_port = 3307
    
           with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel:
    
                   tunnel.start()
    
                 # 获取 Airflow 里配置的 MySQL 连接信息。
                 # 假设你在 Airflow Connections 里面配置好了一个ID 为 'mysqlconn'的MySQL 连接
                 from airflow.hooks.base import BaseHook
                 db_conn = BaseHook.get_connection('mysqlconn')
    
                 # 用 pymysql.connect 创建连接
                 # host 参数一定是 127.0.0.1 或 localhost! port 参数一定要用隧道在本地的绑定端口!
                 connection = pymysql.connect(host='127.0.0.1',
                                              user=db_conn.login,
                                              password=db_conn.password,
                                              database=db_conn.schema, # database 而不是 db
                                              charset='utf8mb4',
                                              port=tunnel.local_bind_port, #使用local_bind_port
                                              connect_timeout=60 #设置超时
                                              )
    
                 # 开启自动提交
                 connection.autocommit(1)
    
                 query = "select * from order where order_id=1"
                 df = pd.read_sql(query, connection)
                 print(df)
    
    
  • 注意点:

    1. host 必须是 '127.0.0.1' 或者'localhost'
    2. port 参数,用 tunnel.local_bind_port, 别直接写死。
    3. 要装 pymysql 库: pip install pymysql
    4. 确保 MySQL 用户有权限通过隧道连接。 在 MySQL 里面执行下这个命令,看看你的用户权限:
      SHOW GRANTS FOR 'your_mysql_user'@'%';
      
      或者更具体一些:
      SHOW GRANTS FOR 'your_mysql_user'@'127.0.0.1';
      
    5. 根据具体情况调整connect_timeout 参数
  • 进阶用法:

    你可以通过配置connect_timeout, read_timeout, write_timeout 这些参数来避免网络波动造成的连接问题。

方案三: 调整JdbcHookmax_allowed_packet (特定情况下有效)

JDBC 是个标准,通过它可以访问各种支持 JDBC 驱动的数据库,不光是 MySQL。 但是, 需要仔细调试一下

  • 原理 : JdbcHook 在某些场景更可靠. 但是要根据实际情况进行参数调优.
  • 操作步骤:
  1. 确保 Jar 包存在: 你得先把 MySQL 的 JDBC 驱动(.jar 文件)放到 Airflow 能找到的地方。 可以考虑放到 dags 文件夹下。

  2. Airflow Connection 配置:

    • Connection Id: jdbcconn (随便起一个名字,方便你引用)
    • Connection Type: Jdbc Connection
    • Host: 127.0.0.1 (重点!)
    • Port: 3307 (跟前面隧道保持一致)
    • Extras里面增加(这里不要轻易去修改Airflow里数据库的这个参数配置,直接在这里指定通常更方便):
    {
     "driver_class": "com.mysql.cj.jdbc.Driver",
     "driver_path": "/opt/airflow/dags/mysql-connector-j-8.2.0.jar",
     "connect_args": {"maxAllowedPacket": "256M"}
    }
    
    

    一定确保这里的 "driver_path" 正确无误。 JDBC 连接配置,主要就这几项:驱动类名 (driver_class)、驱动路径 (driver_path)、还有数据库的 URL。

  3. 代码:

    from airflow.providers.ssh.hooks.ssh import SSHHook
    from airflow.providers.jdbc.hooks.jdbc import JdbcHook
    import pandas as pd
    
    with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook:
    
        # 还是老规矩, 确保隧道
       remote_host = 'your_mysql_server_ip'
       remote_port = 3306
       local_port = 3307  # 确保和其他地方使用的端口一致!
    
       with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel:
    
                 tunnel.start()
    
                # 通过JdbcHook创建连接, 然后获取Engine来查询
                hook = JdbcHook(jdbc_conn_id='jdbcconn')
                engine = hook.get_sqlalchemy_engine()
    
               # 执行SQL
                query = "select * from order where order_id=1"
    
                # 如果结果集比较小,可以尝试一次性取
                try:
                   df = pd.read_sql(query, engine)
                   print(df)
                except Exception as e:
                    # 出现PacketTooBigException , 处理分块逻辑
                       if "PacketTooBigException" in str(e):
                            print("结果集太大,开始分块读取...")
                            chunk_size = 1000  # 每次读取1000条,根据实际调整。
                            df_chunks = pd.read_sql_query(query, engine, chunksize=chunk_size)
    
                           # 拼接所有chunk
                            df_final = pd.concat(df_chunks, ignore_index=True)
                            print(df_final)
    
                       else:
                            # 如果是其他异常,直接抛出.
                            raise e
    
    • 重点是拿到一个sqlalchemy的Engine 来执行sql,而且要增加对可能发生的 PacketTooBigException 的处理, 进行分块。
    • "maxAllowedPacket" 要根据实际需求调整。 可以设置大一点.

总结一下:

  • 能用 MySQLHook 就优先用 MySQLHookSSHHook。简单直接,不用额外装库。记得调整 hostport
  • pymysql 方案作为备选。 它更灵活, 也能精细控制.
  • 遇到大的结果集, 或者网络条件不好, 可以选择JDBC+分块的方案.

上面这几招,基本上能解决大多数通过 SSH 隧道访问 MySQL 的问题了。 要是不行,你再仔细检查下我前面提到的那些原因。