问题描述
我正在尝试并行运行一些Python函数,该函数在整个函数中都有打印命令.我想要的是让每个子进程运行相同的功能,以分组的方式输出到主标准输出.我的意思是,我希望每个子流程的输出仅在完成其任务后才打印.但是,如果在此过程中发生某种错误,我仍要输出子过程中所做的任何事情.
I'm trying to run some Python functions in parallel, which has print commands throughout the function. What I want is to have each subprocess running the same function, to output to the main stdout in a grouped manner. What I mean by that is that I want each subprocess's output to only be printed after it has finished completing its task. If, however, some kind of error occured during this process, I want to still output whatever was done in the subprocess.
一个小例子:
from time import sleep
import multiprocessing as mp
def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)
if __name__ == '__main__':
pool = mp.Pool()
jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)
for job in jobs:
job.wait()
这是并行运行的,但是输出的是:
This runs in parallel, but what is outputted is:
foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4
我想要的是:
foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4
任何一个过程的特定顺序都没有关系,只要每个子过程的每个输出都分组在一起即可.有趣的是,如果我这样做,我会得到期望的输出
It doesn't matter the particular order of either process, as long as each output is grouped together for each subprocess. Interestingly enough, I get my desired output if I do
python test.py > output
我知道每个子进程都没有自己的标准输出,而是使用主标准输出.我已经思考并查找了一些解决方案,例如使其成为一个队列,并且每个子进程都有其自己的stdout,然后在完成后,我们重写flush命令,以便可以将输出输出回去.到队列.之后,我们可以阅读内容.但是,尽管这确实满足了我的要求,但是如果函数中途停止,则无法检索输出.仅在成功完成后才输出.从这里在python中访问子流程的标准输出
I know that each subprocess do not get their own stdout, instead they use the main stdout. I've thought and looked up some solutions to this, such as making it so that we use a Queue, and each subprocess has its own stdout, and then when it's done, we override the flush command so that we can output the output back to the Queue. After that, we can read the contents. However, although this does satisfy what I want, I cannot retrieve the output if the function stopped halfway through. It will only output when it has successfully completed. Got it from here Access standard output of a sub process in python
我也看到了锁的用法,该方法可以起作用,但是它完全杀死了并行运行该函数,因为它必须等待每个子进程执行执行函数foo的功能.
I've also seen the usage of locks, which works, but it completely kills running the function in parallel, since it'd have to wait for each subprocess to function executing the function foo.
此外,如果可能的话,我想避免更改foo函数的实现,因为我需要更改许多功能.
Also, if possible, I'd like to avoid changing the implementation of my foo function, as I have many functions that I would need to change.
我已经研究了库dispy和并行python. Dispy完全可以实现我想要的功能,它有一个单独的stdout/stderr,我可以在最后将其打印出来,但是dispy的问题在于我必须在一个单独的终端中手动运行服务器.我希望能够一次全部运行我的python程序,而不必先打开另一个脚本.另一方面,并行Python可以满足我的要求,但似乎缺乏对它的控制,以及一些烦人的麻烦.特别是,当您打印输出时,它也打印出函数的返回类型,我只想要使用print打印的输出.另外,在运行一个函数时,必须给它一个使用的模块列表,这有点烦人,因为我不想为了运行一个简单的函数而拥有大量的导入列表.
I have looked into the libraries dispy and parallel python. Dispy does exactly what I want, where it has a separate stdout/stderr that I can just print out at the end, but the problem with dispy is that I have to manually run the server in a separate terminal. I want to be able to run my python program all in one go without having to first open another script. Parallel Python on the other hand, does what I want as well, but it seems to be lacking in the control you have over it, as well as some annoying nuisances to it. In particular, when you print out the output, it also prints out the return type of the function, I just want the output that I printed out using print. Also, when running a function, you have to give it a list of modules that it uses, this is slightly annoying, since I do not want to have to have a big list of imports just to run a simple function.
推荐答案
正如您所注意到的,在这种情况下使用锁会杀死多进程,因为您实际上会让所有进程都等待该进程的互斥体释放,谁目前拥有STDOUT的权利".但是,从逻辑上讲,并行运行并与功能/子流程同步打印是排他性的.
As you've noticed, using a lock in this case would kill multiprocessing because you'd essentially have all the processes wait for a mutex release from the process who currently holds the 'rights' to STDOUT. However, running in parallel and printing in sync with your function/subprocess is logically exclusive.
相反,您可以做的是让主流程充当子流程的打印机",这样子流程一旦完成/出错,然后才将要打印的内容发送回主流程.对于似乎不是实时"的打印,您似乎是非常满意的(反正也不能像前面提到的那样),因此这种方法应该为您提供正确的服务.所以:
What you can do instead is to have your main process serve as a 'printer' for your subprocesses in such a way that once your subprocess finishes/errors, then and only then it sends back to your main process what to print. You seem to be perfectly content for printing not to be 'real time' (nor it could, anyway, as previously mentioned) so that approach should serve you just right. So:
import multiprocessing as mp
import random # just to add some randomness
from time import sleep
def foo(x):
output = ["[Process {}]: foo:".format(x)]
for i in range(5):
output.append('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
return "\n".join(output)
if __name__ == '__main__':
pool = mp.Pool(4)
for res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response:")
print(res) # this will print as soon as one of the processes finishes/errors
pool.close()
哪个会给你(当然是YMMV)
Which will give you (YMMV, of course):
[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
您可以观察到其他任何东西,包括以同样的方式出现错误.
You can observe anything else, including errors the same way.
更新-如果您绝对必须使用其输出无法控制的功能,则可以包装子流程并捕获其STDOUT/STDERR,然后在完成后(或引发异常)您可以将所有内容返回到流程经理",以打印到实际的STDOUT.通过这样的设置,我们可以像foo()
所示:
UPDATE - If you absolutely have to use functions whose output you cannot control, you can wrap your subprocesses and capture their STDOUT/STDERR instead, and then once they are done (or raise an exception) you can return everything back to the process 'manager' for printing to the actual STDOUT. With such setup, we can have foo()
like:
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
请注意,它非常高兴地不知道有什么东西试图破坏其操作模式.然后,我们将创建一个外部通用包装器(这样您就不必根据功能来对其进行更改),以其默认行为(不仅是此功能,还包括其他所有功能)实际 mess 它可能会在运行时调用):
Notice that it's blissfully unaware of something trying to mess with its mode of operation. We'll then create an external general purpose wrapper (so you don't have to change it in dependence of functions) to actually mess with its default behavior (and not just this functions, but with everything else it might call while running):
def std_wrapper(args):
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers
# args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
process_name = args[0]
process_args = args[1] if len(args) > 1 else []
process_kwargs = args[2] if len(args) > 2 else {}
# get our method from its name, assuming global namespace of the current module/script
process = globals()[process_name]
response = None # in case a call fails
try:
response = process(*process_args, **process_kwargs) # call our process function
except Exception as e: # too broad but good enough as an example
print(e)
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
现在,我们需要的是调用此包装而不是所需的foo()
,并为它提供有关代表我们调用的内容的信息:
Now all we need is to call this wrapper instead of the desired foo()
, and provide it with information on what to call on our behalf:
if __name__ == '__main__':
pool = mp.Pool(4)
# since we're wrapping the process we're calling, we need to send to the wrapper packed
# data with instructions on what to call on our behalf.
# info on args packing available in the std_wrapper function above.
for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
因此,如果您现在运行它,您将获得类似以下内容的信息:
So now if you run it, you'll get something like this:
[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!
尽管foo()
只是打印掉或出错.当然,您可以使用此类包装器来调用任何函数,并将任何数量的args/kwargs传递给该函数.
Despite foo()
just printing away or erroring. Of course, you can use such wrapper to call any function and pass any number of args/kwargs to it.
更新#2 -但请稍等!如果我们可以像这样包装我们的函数过程,并捕获它们的STDOUT/STDERR,那么我们肯定可以将其转换为装饰器,并通过简单的装饰在代码中使用它.因此,对于我的最终建议:
UPDATE #2 - But wait! If we can wrap our function processes like this, and have their STDOUT/STDERR captured, we surely can turn this into a decorator and use it in our code with a simple decoration. So, for my final proposal:
import functools
import multiprocessing
import random # just to add some randomness
import time
def std_wrapper(func):
@functools.wraps(func) # we need this to unravel the target function name
def caller(*args, **kwargs): # and now for the wrapper, nothing new here
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead
response = None # in case a call fails
try:
response = func(*args, **kwargs) # call our wrapped process function
except Exception as e: # too broad but good enough as an example
print(e) # NOTE: the exception is also printed to the captured STDOUT
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
return caller
@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
time.sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
现在我们可以像以前一样调用包装函数了,而无需处理参数打包或任何类似的事情,所以我们回到了:
And now we can call our wrapped functions as before without dealing with arguments packing or anything of the sort, so we're back at:
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for out, err, res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
输出与上一个示例相同,但是打包的更好,更易于管理.
The output is the same as in the previous example, but in a much nicer and manageable package.
这篇关于有序打印的Python多处理子流程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!