我从 How can I recover the return value of a function passed to multiprocessing.Process? 运行以下解决方案:

import multiprocessing 
from os import getpid 
 
def worker(procnum): 
    print('I am number %d in process %d' % (procnum, getpid())) 
    return getpid() 
 
if __name__ == '__main__': 
    pool = multiprocessing.Pool(processes = 3) 
    print(pool.map(worker, range(5))) 

应该输出如下内容:

I am number 0 in process 19139 
I am number 1 in process 19138 
I am number 2 in process 19140 
I am number 3 in process 19139 
I am number 4 in process 19140 
[19139, 19138, 19140, 19139, 19140] 

但我只得到

[4212, 4212, 4212, 4212, 4212] 

如果我使用超过 10 个进程为 pool.map 提供 1,000,000 的范围,我最多看到两个不同的 pid。

为什么我的 multiprocessing 副本似乎在同一进程中运行所有内容?

请您参考如下方法:

TL;DR:任务没有以任何方式专门分配,也许您的任务太短了,它们在其他进程开始之前就全部完成了。

multiprocessing 的源代码来看,似乎任务只是简单地放在 Queue 中,worker 进程从中读取(函数 workerPool._inqueue 中读取)。没有经过计算的分配, worker 们只是竞相努力工作。

那么最有可能的赌注是,由于任务非常短,所以一个进程在其他进程有机会查看甚至开始之前完成所有任务。您可以通过向任务添加两秒钟的 sleep 轻松检查是否属于这种情况。

我会注意到,在我的机器上,所有任务都非常均匀地分布在进程中(对于#processes > #cores 也是如此)。因此似乎存在一些系统依赖性,即使所有进程都应该在工作排队之前进行 .start()ed。


这是来自 worker 的一些修剪过的源代码,它表明任务只是被每个进程从队列中读取,所以是伪随机顺序:

def worker(inqueue, outqueue, ...): 
    ... 
    get = inqueue.get 
    ... 
    while maxtasks is None or (maxtasks and completed < maxtasks): 
        try: 
            task = get() 
        ... 

SimpleQueue 使用 Pipe 在进程之间进行通信,来自 SimpleQueue 构造函数:

self._reader, self._writer = Pipe(duplex=False) 

编辑:可能关于进程启动太慢的部分是错误的,所以我删除了它。在任何工作排队之前,所有进程都是 .start()ed(这可能是 platform 依赖的)。我找不到进程在 .start() 返回时是否准备就绪。


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!