我从 How can I recover the return value of a function passed to multiprocessing.Process? 运行以下解决方案:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
应该输出如下内容:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
但我只得到
[4212, 4212, 4212, 4212, 4212]
如果我使用超过 10 个进程为 pool.map 提供 1,000,000 的范围,我最多看到两个不同的 pid。
为什么我的 multiprocessing
副本似乎在同一进程中运行所有内容?
请您参考如下方法:
TL;DR:任务没有以任何方式专门分配,也许您的任务太短了,它们在其他进程开始之前就全部完成了。
从 multiprocessing
的源代码来看,似乎任务只是简单地放在 Queue
中,worker 进程从中读取(函数 worker
从 Pool._inqueue
中读取)。没有经过计算的分配, worker 们只是竞相努力工作。
那么最有可能的赌注是,由于任务非常短,所以一个进程在其他进程有机会查看甚至开始之前完成所有任务。您可以通过向任务添加两秒钟的 sleep
轻松检查是否属于这种情况。
我会注意到,在我的机器上,所有任务都非常均匀地分布在进程中(对于#processes > #cores 也是如此)。因此似乎存在一些系统依赖性,即使所有进程都应该在工作排队之前进行 .start()
ed。
这是来自 worker
的一些修剪过的源代码,它表明任务只是被每个进程从队列中读取,所以是伪随机顺序:
def worker(inqueue, outqueue, ...):
...
get = inqueue.get
...
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get()
...
SimpleQueue
使用 Pipe
在进程之间进行通信,来自 SimpleQueue
构造函数:
self._reader, self._writer = Pipe(duplex=False)
编辑:可能关于进程启动太慢的部分是错误的,所以我删除了它。在任何工作排队之前,所有进程都是 .start()
ed(这可能是 platform 依赖的)。我找不到进程在 .start()
返回时是否准备就绪。