返回

Python 多进程多线程结合:避免 Queue 陷阱,高效并行

python

Python 中多进程与多线程的结合:避开陷阱,实现高效并行

背景

开发高性能程序经常会遇到并行计算的需求。利用计算机的多核心资源能大幅提升程序运行效率。Python 提供了 multiprocessingthreading 模块来支持多进程和多线程编程。 两种模式各具优势:多进程可以充分利用多核 CPU,规避全局解释器锁(GIL)的限制,实现真正的并行计算;多线程则更轻量级,适合处理 I/O 密集型任务。将两者结合能否兼顾其优点,达成最佳性能表现?

疑问

有人尝试将多进程与多线程结合使用。试图创建多个进程,每个进程再创建多个线程。这种做法可行吗?有无意义?一个实际问题是:在使用 multiprocessing.Queue() 结合多进程多线程时,程序在 join() 时出现挂起,这是为何?

可行性分析

Python 中,多进程与多线程的结合技术上是可行的。一个进程可以创建和管理多个线程。但需谨慎处理进程和线程间的交互,特别要注意资源共享和同步问题。

应用场景的考量

结合使用多进程和多线程有其特定的应用场景。并非任何任务都适合采用这种模式。对于计算密集型且可以分解为多个独立子任务的场景,可考虑用多进程处理子任务。若子任务涉及较多 I/O 操作,则可在每个进程内创建线程来处理这些 I/O 任务,减少阻塞时间。

问题分析与解决

出现程序挂起问题,主要原因可能在以下几点:

1. multiprocessing.Queue() 的使用不当

multiprocessing.Queue() 用于进程间通信。多个进程共用一个队列时,容易产生问题。在给出的代码中,进程p1p2通过self.filePro内的PQ共享结果,而每个进程里的线程通过 TQ 来收集,再把TQ的东西放进PQ,这个过程中极有可能因为 TQ.get()PQ.put()的线程安全导致 PQ.empty() 的判断失效,导致node()进程无法跳出while循环从而无法继续 p1.join(),同理p2.join() 也将阻塞。更安全的做法是为每个进程创建独立的队列。

2. 死锁

多进程多线程编程需特别关注死锁问题。代码中的多个循环,特别是在多个 while 循环里使用队列get()task_done()方法,再使用join()进行阻塞,可能导致多个循环的竞争条件或未妥善管理而产生死锁。

3. daemon 属性的设置

代码中 p1p2t1t2daemon 属性设置为 True。这意味着当主程序退出时,这些进程/线程会随之退出,不会阻塞程序退出。但在程序内部进行多层的 join() 会带来阻塞的问题。通常不建议把所有子线程、子进程设置为daemon,这会导致它们在程序主逻辑退出的时候可能出现异常的结果。

解决步骤

1. 避免在多进程中共享队列

为每个进程使用独立的队列,可以消除跨进程共享资源导致的问题。每个进程创建并维护自己的队列,只在其内部的线程间共享。最后由每个进程自行处理和传递它管理的队列中的数据。

import multiprocessing
import threading
import queue

class Example:
    def splitList(self, lst):
        mid = len(lst) // 2
        return lst[:mid], lst[mid:]

    def node(self, files):
        l1, l2 = self.splitList(files)
        q1 = multiprocessing.Queue()
        q2 = multiprocessing.Queue()

        p1 = multiprocessing.Process(target=self.filePro, args=(l1, q1,))
        p2 = multiprocessing.Process(target=self.filePro, args=(l2, q2,))

        p1.start()
        p2.start()

        p1.join()
        p2.join()

        results = []
        while not q1.empty():
            results.append(q1.get())
        while not q2.empty():
            results.append(q2.get())
        print(results)

    def filePro(self, lst, process_queue):
        tq = queue.Queue()
        l1, l2 = self.splitList(lst)
        t1 = threading.Thread(target=self.fileThr, args=('a', l1, tq,))
        t2 = threading.Thread(target=self.fileThr, args=('b', l2, tq,))

        t1.start()
        t2.start()

        t1.join()
        t2.join()

        while not tq.empty():
            process_queue.put(tq.get())

    def fileThr(self, id, lst, thread_queue):
        while lst:
            tmp_path = lst.pop()
            if not tmp_path[1]:
                continue
            for item in tmp_path[1]:
                thread_queue.put((id, item))

if __name__ == "__main__":
    files = [('file1', [1, 2]), ('file2', [3, 4]), ('file3', [5, 6]), ('file4', [7, 8])]
    example = Example()
    example.node(files)

操作步骤:

  1. 修改 node 函数,创建并使用独立的多进程队列 q1, q2
  2. 启动进程 p1p2
  3. 通过p1.join()p2.join()等待子进程p1,p2运行完毕。
  4. 获取 q1, q2 里的处理结果。
  5. 移除 fileThr 内部的TQ.join(),它不需要在这里使用join()等待。

2. 精简逻辑, 重构代码

重新考虑进程/线程的退出条件, 通过合理的流程设计, 将可能死锁的while循环改为有具体次数的for循环,减少代码复杂度。移除代码内循环后的join(),避免死锁的产生。注意避免无用对象的持续创建和嵌套循环的使用。
更仔细的设计每个线程处理的任务和队列的读写。可以重新设计fileThr内更合理的机制。

安全建议

  1. 异常处理 : 在多进程和多线程环境下,务必做好异常处理,防止一个进程或线程的异常影响整个程序。
  2. 避免过度复杂化 : 不要为了并行而并行,过度复杂的架构会引入更多问题。谨慎评估程序需求,选择最合适的并行策略。结合分析工具和性能测试结果来决定最优的线程和进程数量。过多的线程或进程会带来巨大的上下文切换开销,反而降低程序运行效率。
  3. 锁的使用 : 如需共享变量,正确使用锁来保证数据安全,并防止产生死锁。