返回

解决 SQLAlchemy 调用 MySQL 存储过程返回旧数据问题

mysql

SQLAlchemy调用MySQL存储过程返回旧数据的问题

在使用 SQLAlchemy 与 MySQL 交互时,如果调用存储过程后发现每次调用返回的结果都包含前几次调用的结果,可能会让人困惑。 这个问题通常是由临时表的使用方式以及 SQLAlchemy 处理连接的方式造成的。让我们深入分析问题原因,并探讨可行的解决方案。

问题根源

问题的关键在于存储过程使用了临时表 (temporary table),而且 SQLAlchemy 的连接和游标行为也参与其中。 当你在存储过程内部创建 temporary table 时,该表的生命周期只存在于当前的数据库会话(session)中。而 SQLAlchemy 使用连接池来管理数据库连接,连接在程序生命周期内会被复用。

以下是一些关键因素:

  • 临时表的生命周期: 临时表仅在其创建的会话期间存在。如果在同一个会话中多次调用该存储过程,每次创建的 tmpFilteredItems 都会在相同会话内,造成叠加,导致旧数据残留。
  • SQLAlchemy 连接复用: SQLAlchemy 默认使用连接池,这意味着 raw_conn = self.eng.get_engine().raw_connection() 获取到的 raw_conn 有可能来自于之前的会话,其中的临时表仍然存在。 每次你再次调用存储过程,就会在已经包含旧数据的临时表上进行新的查询,从而出现数据累积。
  • 存储过程设计: 如果存储过程设计没有显式地清除之前的临时表数据,问题就会变得更加严重。 drop table if exists tmpFilteredItems; 语句在这里应该能够清除之前的临时表,但是,当连接没有完全结束,新的结果添加到老的临时表就可能会发生数据叠加的情况。

解决方案

为了解决这个问题,必须确保在每次调用存储过程前都清理掉之前的临时表。有几种方法可以实现,接下来将详细阐述。

1. 强制关闭并重新建立连接

每次执行存储过程之前,我们都可以选择放弃连接,获取一个新连接,并在此新连接下执行。
这意味着每次我们都需要建立一个新的会话(session)。虽然性能开销略大,但这能有效保证连接清洁。

  • 原理: 每次调用都使用全新的连接和会话,临时表的生命周期仅限于当次会话,这样每次都从零开始,避免数据累积。
    * 操作步骤: 在调用存储过程之前, 关闭当前的引擎和连接, 重新初始化一个。
    def make_recordsets(self):
        try:
            # 1. 新连接和存储过程执行
            eng = self.create_new_engine()  # 重新创建一个引擎实例,  注意连接参数需要保持一致。 
            raw_conn = eng.get_engine().raw_connection()
            cursor = raw_conn.cursor()
            cursor.callproc('sp_get_filtered_items', (self.instanceof_string, self.keyphrase,))
            for result in cursor.stored_results():
                tuples = result.fetchall()
                self.__items_to_dict(tuples)
            print(len(self.filtered_items))
            cursor.close()
             
             #2. 获取新的连接来执行第二个存储过程
            raw_conn = eng.get_engine().raw_connection()
            cursor = raw_conn.cursor()
            cursor.callproc('sp_get_filtered_props')
            for result in cursor.stored_results():
                tuples = result.fetchall()
                self.__props_to_dict(tuples)
            print(len(self.filtered_props))
            cursor.close()
            # 3. 处理属性, 获取最新的值
            if not len(self.filtered_props) == 0:
                d = self.filtered_props[0]
                self.link_property = d['property']
                self.link_property_label = d['propertyLabel']
              #4. 新的连接执行最后一条存储过程。
            raw_conn = eng.get_engine().raw_connection()
            cursor = raw_conn.cursor()
            cursor.callproc('sp_get_filtered_values')
            for result in cursor.stored_results():
                tuples = result.fetchall()
                self.__values_to_dict(tuples)
            print(len(self.filtered_values))
            cursor.close()

            raw_conn.commit()
            
        except Exception as e:
            print(e.args)
            if raw_conn: #检查连接是否存在,防止conn为None
                raw_conn.rollback()

        finally:
            if raw_conn:
                raw_conn.close() # 确保释放数据库连接。
            if eng: 
                 eng.dispose()#关闭新的引擎连接
             
    def create_new_engine(self): # 添加一个新引擎创建的方法
       return  create_engine(URL(**self.eng_connection_dict), echo=self.eng_echo) # engine 必须使用一样的链接字符串配置。

2. 使用事务性临时表

确保每次执行存储过程时,临时表都处在一个明确定义的事务边界内。 当事务完成时,数据库会自动清理临时表,避免旧数据污染后续调用。

  • 原理: 事务性临时表只在当前事务内存在,事务结束时会自动被清理。通过事务管理,可控制临时表的作用域,确保每次执行存储过程时的环境都是干净的。
  • 操作步骤: 结合 SQLAlchemy 的事务特性使用 begin()commit() 方法,包裹每次存储过程的调用。
   def make_recordsets(self):
      try:
         
          eng = self.eng.get_engine() # 使用原本的数据库连接,而不是直接关闭并重新创建引擎
          
          with eng.connect() as conn:  
            # 1.  事务内的存储过程执行。
            with conn.begin(): 
               cursor = conn.execution_options(isolation_level="READ COMMITTED").connection.cursor() # 设置事务级别,防止其他问题。
               cursor.callproc('sp_get_filtered_items', (self.instanceof_string, self.keyphrase,))
               for result in cursor.stored_results():
                 tuples = result.fetchall()
                 self.__items_to_dict(tuples)
               print(len(self.filtered_items))
               cursor.close()

             # 2.  事务内的存储过程执行。
            with conn.begin():
                cursor = conn.execution_options(isolation_level="READ COMMITTED").connection.cursor()
                cursor.callproc('sp_get_filtered_props')
                for result in cursor.stored_results():
                  tuples = result.fetchall()
                  self.__props_to_dict(tuples)
                print(len(self.filtered_props))
                cursor.close()

             # 3. 逻辑
            if not len(self.filtered_props) == 0:
               d = self.filtered_props[0]
               self.link_property = d['property']
               self.link_property_label = d['propertyLabel']

            # 4.  事务内的存储过程执行。
            with conn.begin():
               cursor = conn.execution_options(isolation_level="READ COMMITTED").connection.cursor()
               cursor.callproc('sp_get_filtered_values')
               for result in cursor.stored_results():
                   tuples = result.fetchall()
                   self.__values_to_dict(tuples)
               print(len(self.filtered_values))
               cursor.close()

            # no need to conn.commit() as with statements manages this and closes resources appropriately

      except Exception as e:
           print(e.args)

      finally:
            if eng: # 添加此行确保engine释放连接资源。
              eng.dispose()

额外的安全建议

  • 避免使用全局临时表: 尽量使用会话级临时表,以避免会话之间的干扰。全局临时表存在于会话和连接之间,容易引起问题,不应该频繁使用。
  • 显式清理临时表: 即使使用了事务或重新连接的方式,也应该在存储过程中添加显式 DROP TABLE IF EXISTS 语句来确保清理,双重保险总没有坏处。
  • 检查数据 : 执行操作后, 应该立即输出结果进行验证,检查数据的变化情况。

通过采取上述方案,应该能够有效地解决SQLAlchemy调用MySQL存储过程返回旧数据的问题。务必根据自身项目情况选择最合适的方案, 并关注连接管理和临时表使用这两个关键点。