以下是一个Python问题,该问题演示了如何使用func并行迭代函数multiprocessing.Pool。要迭代的元素的Np数量。函数func仅返回Np减去可迭代索引。可以看出,在并行模式下运行时,我使用队列从函数返回值。

如果设置了runParallel=False,则该程序可以以串行模式执行。

对于runParallel=FalserunParallel=True,该程序运行良好,但是现在出现了我遇到的基本问题:正如您可能在下面看到的,如果将problemIndex设置为比Np(例如problemIndex=7)低一点,那么我将产生一个浮点异常。我除以零-愚蠢的我:-)

如果运行runParallel=False,那么我可以看到该错误的源代码行,并且可以直接捕获该错误。

$ python map.py
Traceback (most recent call last):
  File "map.py", line 63, in <module>
    a = func(argList[p])
  File "map.py", line 22, in func
    ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero

好的!

但是,对于runParallel=True,我只停留在“Bummer”打印部分,而没有指出错误的来源。恼人的!

我的问题是:对于runParallel=True,我如何才能有效地调试它并从Pool()中获取 buggy 代码行的行号?
#!/usr/bin/python
# map.py
import time
import multiprocessing
import sys
import random

# Toggle whether we run parallel or not
runParallel = True

# Problematic index - if less than Np we create an exception
problemIndex = 13

# Number of compute problems
Np = 10

def func(args):
    # Emulate that the function might be fast or slow
    time.sleep(random.randint(1,4))
    ret  = args["Np"] - args["index"]
    # Emulate a bug
    if args["index"]==args["problemIndex"]:
        ret = 1/(args["index"]-args["problemIndex"])
    # Return data
    if args["runParallel"]:
        # We use a queue thus ordering may not be protected
        args["q"].put((args["index"],ret))
    else:
        return ret

# Return queue used when running parallel
manager = multiprocessing.Manager()
q = manager.Queue()

# Build argument lists
argList = []
for i in range(Np):
    args={}
    args["index"] = i # index
    args["Np"] = Np   # Number of problems
    args["q"] = q     # return queue for parallel execution mode
    args["problemIndex"] = problemIndex  # if index == problemIndex then func will malfunction
    args["runParallel"] = runParallel    # should we run parallel
    argList.append(args)

#should we run parallel
if runParallel:
    # Run 10 processes in parallel
    p = multiprocessing.Pool(processes=10)
    ret = p.map_async(func, argList)
    ret.wait()
    qLen = q.qsize()
    p.close()
    if not qLen == Np:
        print "Bummer - one of more worker threads broke down",Np,qLen
        sys.exit(0)

resultVector = [None]*Np
for p in range(Np):
    if runParallel:
        (i,a) = q.get(timeout=0.1)
    else:
        i = p
        a = func(argList[p])
    resultVector[i] = a

for i in range(Np):
    print "Index", i, "gives",resultVector[i]

最佳答案

我发现回溯模块在多处理调试中非常有用。如果将异常传递回主线程/进程,则将丢失所有回溯信息,因此您需要在子线程内调用traceback.format_exc并将该文本与异常传递回主线程。在下面,我提供了可与Pool一起使用的模式。

import traceback
import multiprocessing as mp
import time

def mpFunctionReportError(kwargs):
    '''
    wrap any function and catch any errors from f,
    putting them in pipe instead of raising
    kwargs must contain 'queue' (multiprocessing queue)
                    and 'f' function to be run
    '''
    queue = kwargs.pop('queue')
    f = kwargs.pop('f')
    rslt=None
    try:
        rslt = f(**kwargs)
        queue.put(rslt)
    except Exception, e:
        queue.put([e,traceback.format_exc(e)])
    return

def doNothing(a):
    return a

def raiseException(a):
    a='argh'
    raise ValueError('this is bad')


manager = mp.Manager()
outQ = manager.Queue()
p = mp.Pool(processes=4)

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=doNothing,queue=outQ,a='pointless!') for i in xrange(4)])
ret.wait()
time.sleep(1)
for i in xrange(4):
    print(outQ.get_nowait())

ret = p.map_async(mpFunctionReportError,iterable=[dict(f=raiseException,queue=outQ,a='pointless!') for i in xrange(2)])
ret.wait()
time.sleep(1)
for i in xrange(2):
    e,trace = outQ.get_nowait()
    print(e)
    print(trace)

运行此示例将得出:
pointless!
pointless!
pointless!
pointless!
this is bad
Traceback (most recent call last):
  File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/mpDemo.py", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

this is bad
Traceback (most recent call last):
  File "/home/john/projects/mpDemo.py", line 13, in mpFunctionReportError
    rslt = f(**kwargs)
  File "/home/john/projects/mpDemo.py", line 24, in raiseException
    raise ValueError('this is bad')
ValueError: this is bad

关于Python并行执行-如何高效调试?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23269230/

10-12 07:18