以下是一个Python问题,该问题演示了如何使用func
并行迭代函数multiprocessing.Pool
。要迭代的元素的Np
数量。函数func
仅返回Np
减去可迭代索引。可以看出,在并行模式下运行时,我使用队列从函数返回值。
如果设置了runParallel=False
,则该程序可以以串行模式执行。
对于runParallel=False
和runParallel=True
,该程序运行良好,但是现在出现了我遇到的基本问题:正如您可能在下面看到的,如果将problemIndex
设置为比Np
(例如problemIndex=7
)低一点,那么我将产生一个浮点异常。我除以零-愚蠢的我:-)
如果运行runParallel=False
,那么我可以看到该错误的源代码行,并且可以直接捕获该错误。
$ python map.py
Traceback (most recent call last):
File "map.py", line 63, in <module>
a = func(argList[p])
File "map.py", line 22, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
好的!
但是,对于
runParallel=True
,我只停留在“Bummer”打印部分,而没有指出错误的来源。恼人的!我的问题是:对于
runParallel=True
,我如何才能有效地调试它并从Pool()
中获取 buggy 代码行的行号?#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random
# Toggle whether we run parallel or not
runParallel = True
# Problematic index - if less than Np we create an exception
problemIndex = 13
# Number of compute problems
Np = 10
def func(args):
# Emulate that the function might be fast or slow
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
# Emulate a bug
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
# Return data
if args["runParallel"]:
# We use a queue thus ordering may not be protected
args["q"].put((args["index"],ret))
else:
return ret
# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()
# Build argument lists
argList = []
for i in range(Np):
args={}
args["index"] = i # index
args["Np"] = Np # Number of problems
args["q"] = q # return queue for parallel execution mode
args["problemIndex"] = problemIndex # if index == problemIndex then func will malfunction
args["runParallel"] = runParallel # should we run parallel
argList.append(args)
#should we run parallel
if runParallel:
# Run 10 processes in parallel
p = multiprocessing.Pool(processes=10)
ret = p.map_async(func, argList)
ret.wait()
qLen = q.qsize()
p.close()
if not qLen == Np:
print "Bummer - one of more worker threads broke down",Np,qLen
sys.exit(0)
resultVector = [None]*Np
for p in range(Np):
if runParallel:
(i,a) = q.get(timeout=0.1)
else:
i = p
a = func(argList[p])
resultVector[i] = a
for i in range(Np):
print "Index", i, "gives",resultVector[i]
最佳答案
我发现回溯模块在多处理调试中非常有用。如果将异常传递回主线程/进程,则将丢失所有回溯信息,因此您需要在子线程内调用traceback.format_exc
并将该文本与异常传递回主线程。在下面,我提供了可与Pool一起使用的模式。
import traceback
import multiprocessing as mp
import time
def mpFunctionReportError(kwargs):
'''
wrap any function and catch any errors from f,
putting them in pipe instead of raising
kwargs must contain 'queue' (multiprocessing queue)
and 'f' function to be run
'''
queue = kwargs.pop('queue')
f = kwargs.pop('f')
rslt=None
try:
rslt = f(**kwargs)
queue.put(rslt)
except Exception, e:
queue.put([e,traceback.format_exc(e)])
return
def doNothing(a):
return a
def raiseException(a):
a='argh'
raise ValueError('this is bad')
manager = mp.Manager()
outQ = manager.Queue()
p = mp.Pool(processes=4)
ret = p.map_async(mpFunctionReportError,iterable=[dict(f=doNothing,queue=outQ,a='pointless!') for i in xrange(4)])
ret.wait()
time.sleep(1)
for i in xrange(4):
print(outQ.get_nowait())
ret = p.map_async(mpFunctionReportError,iterable=[dict(f=raiseException,queue=outQ,a='pointless!') for i in xrange(2)])
ret.wait()
time.sleep(1)
for i in xrange(2):
e,trace = outQ.get_nowait()
print(e)
print(trace)
运行此示例将得出:
pointless!
pointless!
pointless!
pointless!
this is bad
Traceback (most recent call last):
File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
rslt = f(**kwargs)
File "/home/john/projects/mpDemo.py", line 24, in raiseException
raise ValueError('this is bad')
ValueError: this is bad
this is bad
Traceback (most recent call last):
File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
rslt = f(**kwargs)
File "/home/john/projects/mpDemo.py", line 24, in raiseException
raise ValueError('this is bad')
ValueError: this is bad
关于Python并行执行-如何高效调试?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23269230/