Airflow SSH隧道访问MySQL踩坑实录与解决方案
2025-03-06 14:10:58
Airflow 通过 SSH 隧道访问 MySQL 数据库的踩坑实录
工作中,遇到个事儿:要用 Airflow 做个 ETL 流程,从 MySQL 抽数据到 Postgres。 数据库那边要求必须通过 SSH 隧道访问,说是为了安全,咱也没法改数据库配置,只能硬着头皮上了。结果,折腾了好久才搞定。这过程里头的坑,得记录下,给大伙儿提个醒。
问题:隧道通了,连不上库
按理说,SSH 隧道建好了,数据库连接应该没问题啊。结果,愣是各种报错,就是连不上。
为什么会这样?
这问题,细究起来,原因可能有这么几个:
- 端口映射问题: SSH 隧道建好了,本地端口和远程数据库端口之间的映射可能没搞对。 咱本地访问 127.0.0.1:3306, 这个请求能不能正确转发到远端数据库的对应端口, 这中间哪个环节出点岔子都不行。
- MySQL 用户权限问题: 即使隧道通了,数据库那边也得认你这个用户啊。有可能你用来连接数据库的这个用户, 压根儿就没权限从你建立隧道的这个机器 IP 地址去连。
- 网络问题: 别看隧道日志显示连接成功了,网络波动、防火墙规则啥的,都有可能导致连接不稳定,甚至中断。
- 数据库配置: 有的 MySQL 数据库,它可能绑定了特定的 IP 地址。你要是从别的 IP (即使是隧道过来的)去访问,它就不让你连。
max_allowed_packet
设置问题 : JDBC 默认值小了, 容易出现PacketTooBigException
。
解决方案:各种尝试
问题原因可能有这么多,咱也不能干等着。 我试了好几种方法,一样样来说:
方案一:调整 SSHHook
和 MySQLHook
(最终方案)
这块问题出现的概率其实不低, SSHHook
和 MySQLHook
都需要根据实际情况调整.
-
原理和作用:
SSHHook
负责建立 SSH 隧道。MySqlHook
是 Airflow 里专门用来连接 MySQL 数据库的。这俩配合,先建隧道,再通过隧道连接。 -
代码示例:
先看创建SSH隧道的这部分。 核心代码如下, 要保证把远端数据库的端口(这里用的默认的3306, 你根据实际情况改),映射到本地的某个端口:from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.providers.mysql.hooks.mysql import MySqlHook import pandas as pd import logging #假设你在airflow的connection里面已经设置好了一个叫 'connection_ssh'的连接。 with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook: # 获取远程MySQL服务器实际监听的端口 remote_host = 'your_mysql_server_ip' # 换成你实际的 MySQL 服务器 IP remote_port = 3306 #大多数情况都是3306,如有不同请调整 # 在本地挑选一个没有被占用的端口。 不要总是 3306,换一个试试。 local_port = 3307 with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel: tunnel.start() logging.info(f"SSH Tunnel opened: Local Port {local_port}") # 使用 MySqlHook。 注意,host 和 port 参数要改! mysql_hook = MySqlHook(mysql_conn_id='mysqlconn') #直接用 get_sqlalchemy_engine 来避免获取连接后又关闭的错误操作。 engine = mysql_hook.get_sqlalchemy_engine(engine_kwargs={"connect_args": {"port": local_port}}) # 通过这个engine来读取数据。 query = "select * from order where order_id=1" df = pd.read_sql(query, engine) print(df)
-
额外提示 :
1. 确保 Airflow Worker 节点能正常访问 SSH 服务器。
2. `local_port` 可以尝试使用其他的, 例如3307、13306等。
-
进阶用法:
如果你对性能有要求,可以在
get_sqlalchemy_engine
里配置连接池参数,比如:engine = mysql_hook.get_sqlalchemy_engine(engine_kwargs={"connect_args": {"port": local_port}, "pool_size": 10, "max_overflow": 20})
这样能减少频繁创建和关闭连接带来的开销。
方案二: 使用pymysql
(部分情况有效)
-
原理:
pymysql
是 Python 里一个常用的 MySQL 客户端库。 它比MySQLHook
更底层,可以更灵活地控制连接参数。 -
代码示例:
先看创建SSH隧道的这部分。
这里local_bind_port就是本地映射的端口了:from airflow.providers.ssh.hooks.ssh import SSHHook import pymysql import pandas as pd with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook: # 和上面一样,先确定远程和本地端口 remote_host = 'your_mysql_server_ip' remote_port = 3306 local_port = 3307 with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel: tunnel.start() # 获取 Airflow 里配置的 MySQL 连接信息。 # 假设你在 Airflow Connections 里面配置好了一个ID 为 'mysqlconn'的MySQL 连接 from airflow.hooks.base import BaseHook db_conn = BaseHook.get_connection('mysqlconn') # 用 pymysql.connect 创建连接 # host 参数一定是 127.0.0.1 或 localhost! port 参数一定要用隧道在本地的绑定端口! connection = pymysql.connect(host='127.0.0.1', user=db_conn.login, password=db_conn.password, database=db_conn.schema, # database 而不是 db charset='utf8mb4', port=tunnel.local_bind_port, #使用local_bind_port connect_timeout=60 #设置超时 ) # 开启自动提交 connection.autocommit(1) query = "select * from order where order_id=1" df = pd.read_sql(query, connection) print(df)
-
注意点:
host
必须是'127.0.0.1'
或者'localhost'
。port
参数,用tunnel.local_bind_port
, 别直接写死。- 要装
pymysql
库:pip install pymysql
- 确保 MySQL 用户有权限通过隧道连接。 在 MySQL 里面执行下这个命令,看看你的用户权限:
或者更具体一些:SHOW GRANTS FOR 'your_mysql_user'@'%';
SHOW GRANTS FOR 'your_mysql_user'@'127.0.0.1';
- 根据具体情况调整connect_timeout 参数
-
进阶用法:
你可以通过配置
connect_timeout
,read_timeout
,write_timeout
这些参数来避免网络波动造成的连接问题。
方案三: 调整JdbcHook
和 max_allowed_packet
(特定情况下有效)
JDBC 是个标准,通过它可以访问各种支持 JDBC 驱动的数据库,不光是 MySQL。 但是, 需要仔细调试一下
- 原理 :
JdbcHook
在某些场景更可靠. 但是要根据实际情况进行参数调优. - 操作步骤:
-
确保 Jar 包存在: 你得先把 MySQL 的 JDBC 驱动(.jar 文件)放到 Airflow 能找到的地方。 可以考虑放到
dags
文件夹下。 -
Airflow Connection 配置:
Connection Id
: jdbcconn (随便起一个名字,方便你引用)Connection Type
: Jdbc ConnectionHost
: 127.0.0.1 (重点!)Port
: 3307 (跟前面隧道保持一致)- Extras里面增加(这里不要轻易去修改Airflow里数据库的这个参数配置,直接在这里指定通常更方便):
{ "driver_class": "com.mysql.cj.jdbc.Driver", "driver_path": "/opt/airflow/dags/mysql-connector-j-8.2.0.jar", "connect_args": {"maxAllowedPacket": "256M"} }
一定确保这里的
"driver_path"
正确无误。 JDBC 连接配置,主要就这几项:驱动类名 (driver_class
)、驱动路径 (driver_path
)、还有数据库的 URL。 -
代码:
from airflow.providers.ssh.hooks.ssh import SSHHook from airflow.providers.jdbc.hooks.jdbc import JdbcHook import pandas as pd with SSHHook(ssh_conn_id='connection_ssh', conn_timeout=300, cmd_timeout=300, keepalive_interval=30, auth_timeout=300) as ssh_hook: # 还是老规矩, 确保隧道 remote_host = 'your_mysql_server_ip' remote_port = 3306 local_port = 3307 # 确保和其他地方使用的端口一致! with ssh_hook.get_tunnel(remote_host=remote_host, remote_port=remote_port, local_port=local_port) as tunnel: tunnel.start() # 通过JdbcHook创建连接, 然后获取Engine来查询 hook = JdbcHook(jdbc_conn_id='jdbcconn') engine = hook.get_sqlalchemy_engine() # 执行SQL query = "select * from order where order_id=1" # 如果结果集比较小,可以尝试一次性取 try: df = pd.read_sql(query, engine) print(df) except Exception as e: # 出现PacketTooBigException , 处理分块逻辑 if "PacketTooBigException" in str(e): print("结果集太大,开始分块读取...") chunk_size = 1000 # 每次读取1000条,根据实际调整。 df_chunks = pd.read_sql_query(query, engine, chunksize=chunk_size) # 拼接所有chunk df_final = pd.concat(df_chunks, ignore_index=True) print(df_final) else: # 如果是其他异常,直接抛出. raise e
- 重点是拿到一个sqlalchemy的Engine 来执行sql,而且要增加对可能发生的
PacketTooBigException
的处理, 进行分块。 "maxAllowedPacket"
要根据实际需求调整。 可以设置大一点.
- 重点是拿到一个sqlalchemy的Engine 来执行sql,而且要增加对可能发生的
总结一下:
- 能用
MySQLHook
就优先用MySQLHook
和SSHHook
。简单直接,不用额外装库。记得调整host
和port
。 pymysql
方案作为备选。 它更灵活, 也能精细控制.- 遇到大的结果集, 或者网络条件不好, 可以选择JDBC+分块的方案.
上面这几招,基本上能解决大多数通过 SSH 隧道访问 MySQL 的问题了。 要是不行,你再仔细检查下我前面提到的那些原因。