当我运行下面的代码时,它有时会失败

time_start = time.time() 
job = multiprocessing.Process(target=load_cpu, args=(deadline, )) 
job.start() # This is line 37 in the source code linked below 
# timeout=None in the call to join() solves the problem 
job.join(deadline) 
elapsed = time.time()-time_start 
if elapsed < deadline and job.is_alive(): 
    # I am getting here from time to time 
    logger.error(f"#{job_counter}: job.join() returned while process {job.pid} is still alive elapsed={elapsed} deadline={deadline}") 

演示问题的Python 3.7容器(Docker)在这里 https://github.com/larytet-py/multiprocess
如果我在4核Ubuntu 18.04主机上运行代码几分钟,我会得到
Traceback (most recent call last): 
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner 
    self.run() 
  File "/usr/lib/python3.7/threading.py", line 870, in run 
    self._target(*self._args, **self._kwargs) 
  File "main.py", line 37, in spawn_job 
    job.start() 
  File "/usr/lib/python3.7/multiprocessing/process.py", line 111, in start 
    _cleanup() 
  File "/usr/lib/python3.7/multiprocessing/process.py", line 56, in _cleanup 
    if p._popen.poll() is not None: 
AttributeError: 'NoneType' object has no attribute 'poll' 

我究竟做错了什么?
我的解决方法是用轮询替换对job.join()的调用,并检查is_alive()。不幸的是,这种方法会影响延迟。有更好的选择吗?
def join_process(job, timeout): 
    time_start = time.time() 
    # Typical processing time is 100ms I want to reduce latency impact 
    # 10ms looks ok.  
    # TODO I can end up in a tight loop here.  
    polling_time = min(0.1*timeout, 0.010) 
    while time.time()-time_start < timeout and job.is_alive(): 
        time.sleep(polling_time) 
        continue 

更新。我尝试用multiprocessing.Event()代替Process.join()代码失败,并出现相同的异常

更新2。我已经在根本不调用Process.join()的代码中重现了该问题。它需要更多时间和更多负载,但最终Process.start()崩溃。

更新3。 https://bugs.python.org/issue40860被接受了吗?我仍在寻找解决方法。

请您参考如下方法:

同步对Process.start()的调用会有所帮助。这是一个合理的解决方法。没有其他答案。我接受自己的回答。

diff --git a/main.py b/main.py 
index d09dc53..49d68f0 100644 
--- a/main.py 
+++ b/main.py 
@@ -26,17 +26,24 @@ def load_cpu(deadline): 
     while time.time() - start < 0.2*deadline: 
         math.pow(random.randint(0, 1), random.randint(0, 1)) 
  
+def join_process(job, timeout): 
+    time_start = time.time() 
+    while time.time()-time_start < timeout and job.is_alive(): 
+        time.sleep(0.1   * timeout) 
+        continue 
+ 
 job_counter = 0 
+lock = threading.Lock() 
 def spawn_job(deadline): 
     ''' 
     Creat a new Process, call join(), process errors 
     '''     
     global job_counter 
     time_start = time.time() 
-    job = multiprocessing.Process(target=load_cpu, args=(deadline, )) 
-    job.start() 
-    # timeout=None in the call to join() solves the problem 
-    job.join(deadline) 
+    with lock: 
+        job = multiprocessing.Process(target=load_cpu, args=(deadline, )) 
+        job.start() 
+    join_process(job, deadline) 
我的最终版本使用os.fork()。我完全放弃了多处理。多重处理不是线程安全的(我不是在开玩笑) https://gist.github.com/larytet/3ca9f9a32b1dc089a24cb7011455141f


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!