Python 多进程多线程结合:避免 Queue 陷阱,高效并行
2024-12-19 23:26:29
Python 中多进程与多线程的结合:避开陷阱,实现高效并行
背景
开发高性能程序经常会遇到并行计算的需求。利用计算机的多核心资源能大幅提升程序运行效率。Python 提供了 multiprocessing
和 threading
模块来支持多进程和多线程编程。 两种模式各具优势:多进程可以充分利用多核 CPU,规避全局解释器锁(GIL)的限制,实现真正的并行计算;多线程则更轻量级,适合处理 I/O 密集型任务。将两者结合能否兼顾其优点,达成最佳性能表现?
疑问
有人尝试将多进程与多线程结合使用。试图创建多个进程,每个进程再创建多个线程。这种做法可行吗?有无意义?一个实际问题是:在使用 multiprocessing.Queue()
结合多进程多线程时,程序在 join()
时出现挂起,这是为何?
可行性分析
Python 中,多进程与多线程的结合技术上是可行的。一个进程可以创建和管理多个线程。但需谨慎处理进程和线程间的交互,特别要注意资源共享和同步问题。
应用场景的考量
结合使用多进程和多线程有其特定的应用场景。并非任何任务都适合采用这种模式。对于计算密集型且可以分解为多个独立子任务的场景,可考虑用多进程处理子任务。若子任务涉及较多 I/O 操作,则可在每个进程内创建线程来处理这些 I/O 任务,减少阻塞时间。
问题分析与解决
出现程序挂起问题,主要原因可能在以下几点:
1. multiprocessing.Queue()
的使用不当
multiprocessing.Queue()
用于进程间通信。多个进程共用一个队列时,容易产生问题。在给出的代码中,进程p1
和p2
通过self.filePro
内的PQ
共享结果,而每个进程里的线程通过 TQ
来收集,再把TQ
的东西放进PQ
,这个过程中极有可能因为 TQ.get()
和PQ.put()
的线程安全导致 PQ.empty()
的判断失效,导致node()
进程无法跳出while循环从而无法继续 p1.join()
,同理p2.join()
也将阻塞。更安全的做法是为每个进程创建独立的队列。
2. 死锁
多进程多线程编程需特别关注死锁问题。代码中的多个循环,特别是在多个 while
循环里使用队列get()
和task_done()
方法,再使用join()
进行阻塞,可能导致多个循环的竞争条件或未妥善管理而产生死锁。
3. daemon
属性的设置
代码中 p1
、p2
、t1
、t2
的 daemon
属性设置为 True
。这意味着当主程序退出时,这些进程/线程会随之退出,不会阻塞程序退出。但在程序内部进行多层的 join()
会带来阻塞的问题。通常不建议把所有子线程、子进程设置为daemon
,这会导致它们在程序主逻辑退出的时候可能出现异常的结果。
解决步骤
1. 避免在多进程中共享队列
为每个进程使用独立的队列,可以消除跨进程共享资源导致的问题。每个进程创建并维护自己的队列,只在其内部的线程间共享。最后由每个进程自行处理和传递它管理的队列中的数据。
import multiprocessing
import threading
import queue
class Example:
def splitList(self, lst):
mid = len(lst) // 2
return lst[:mid], lst[mid:]
def node(self, files):
l1, l2 = self.splitList(files)
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=self.filePro, args=(l1, q1,))
p2 = multiprocessing.Process(target=self.filePro, args=(l2, q2,))
p1.start()
p2.start()
p1.join()
p2.join()
results = []
while not q1.empty():
results.append(q1.get())
while not q2.empty():
results.append(q2.get())
print(results)
def filePro(self, lst, process_queue):
tq = queue.Queue()
l1, l2 = self.splitList(lst)
t1 = threading.Thread(target=self.fileThr, args=('a', l1, tq,))
t2 = threading.Thread(target=self.fileThr, args=('b', l2, tq,))
t1.start()
t2.start()
t1.join()
t2.join()
while not tq.empty():
process_queue.put(tq.get())
def fileThr(self, id, lst, thread_queue):
while lst:
tmp_path = lst.pop()
if not tmp_path[1]:
continue
for item in tmp_path[1]:
thread_queue.put((id, item))
if __name__ == "__main__":
files = [('file1', [1, 2]), ('file2', [3, 4]), ('file3', [5, 6]), ('file4', [7, 8])]
example = Example()
example.node(files)
操作步骤:
- 修改
node
函数,创建并使用独立的多进程队列q1
,q2
。 - 启动进程
p1
和p2
。 - 通过
p1.join()
和p2.join()
等待子进程p1
,p2
运行完毕。 - 获取
q1
,q2
里的处理结果。 - 移除
fileThr
内部的TQ.join()
,它不需要在这里使用join()
等待。
2. 精简逻辑, 重构代码
重新考虑进程/线程的退出条件, 通过合理的流程设计, 将可能死锁的while
循环改为有具体次数的for
循环,减少代码复杂度。移除代码内循环后的join()
,避免死锁的产生。注意避免无用对象的持续创建和嵌套循环的使用。
更仔细的设计每个线程处理的任务和队列的读写。可以重新设计fileThr
内更合理的机制。
安全建议
- 异常处理 : 在多进程和多线程环境下,务必做好异常处理,防止一个进程或线程的异常影响整个程序。
- 避免过度复杂化 : 不要为了并行而并行,过度复杂的架构会引入更多问题。谨慎评估程序需求,选择最合适的并行策略。结合分析工具和性能测试结果来决定最优的线程和进程数量。过多的线程或进程会带来巨大的上下文切换开销,反而降低程序运行效率。
- 锁的使用 : 如需共享变量,正确使用锁来保证数据安全,并防止产生死锁。