问题描述
我一直在尝试为控制某些硬件的库编写交互式包装器(用于 ipython).有些调用对 IO 来说很重,因此并行执行任务是有意义的.使用线程池(几乎)效果很好:
I've been trying to write an interactive wrapper (for use in ipython) for a library that controls some hardware. Some calls are heavy on the IO so it makes sense to carry out the tasks in parallel. Using a ThreadPool (almost) works nicely:
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
如果用户想要停止进程或do_other_stuff_for_a_bit()
中出现IO 错误,就会出现问题.按 + 停止主进程,但工作线程继续运行,直到它们的当前任务完成.
有什么方法可以停止这些线程而不必重写库或让用户退出 python?我在其他示例中看到的 pool.terminate()
和 pool.join()
似乎没有完成这项工作.
The problem comes if the user wants to stop the process or there is an IO error in do_other_stuff_for_a_bit()
. Pressing + stops the main process but the worker threads carry on running until their current task is complete.
Is there some way to stop these threads without having to rewrite the library or have the user exit python? pool.terminate()
and pool.join()
that I have seen used in other examples do not seem to do the job.
实际例程(而不是上面的简化版本)使用日志记录,尽管所有工作线程都在某个时候关闭,但我可以看到它们开始运行的进程一直持续到完成(作为硬件,我可以看到它们的看房间对面的效果).
The actual routine (instead of the simplified version above) uses logging and although all the worker threads are shut down at some point, I can see the processes that they started running carry on until complete (and being hardware I can see their effect by looking across the room).
这是在python 2.7中.
This is in python 2.7.
更新:
解决方案似乎是切换到使用 multiprocessing.Process 而不是线程池.我试过的测试代码是运行foo_pulse:
The solution seems to be to switch to using multiprocessing.Process instead of a thread pool. The test code I tried is to run foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
如果您尝试使用 ThreadPool 运行它,那么 ctrl-C 不会停止 foo_pulse 的运行(即使它确实立即终止了线程,打印语句仍会继续出现:
If you try running this using ThreadPool then ctrl-C does not stop foo_pulse from running (even though it does kill the threads right away, the print statements keep on coming:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
但是使用 multiprocessing.Process 的版本按预期工作:
However a version using multiprocessing.Process works as expected:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
我已经为库 foo 定义了一个包装器(这样它就不需要重新编写了).如果不需要返回值,则此包装器也不需要:
Where I have defined a wrapper for the library foo (so that it did not need to be re-written). If the return value is not needed the neither is this wrapper :
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
从文档中我看不出池无法工作的原因(除了错误).
From the documentation I see no reason why a pool would not work (other than a bug).
推荐答案
这是一个非常有趣的并行用法.
This is a very interesting use of parallelism.
但是,如果您使用 multiprocessing
,目标是让多个进程并行运行,而不是一个进程运行多个线程.
However, if you are using multiprocessing
, the goal is to have many processes running in parallel, as opposed to one process running many threads.
考虑使用multiprocessing
来实现它的几个变化:
Consider these few changes to implement it using multiprocessing
:
你有这些并行运行的函数:
You have these functions that will run in parallel:
import time
import multiprocessing as mp
def some_long_task_from_library(wtime):
time.sleep(wtime)
class MyException(Exception): pass
def do_other_stuff_for_a_bit():
time.sleep(5)
raise MyException("Something Happened...")
让我们创建并启动进程,比如 4:
Let's create and start the processes, say 4:
procs = [] # this is not a Pool, it is just a way to handle the
# processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
p = mp.Process(target=some_long_task_from_library, args=(1000,))
p.start()
procs.append(p)
mp.active_children() # this joins all the started processes, and runs them.
进程是并行运行的,大概是在一个单独的 cpu 内核中,但这是由操作系统决定的.您可以检查您的系统监视器.
The processes are running in parallel, presumably in a separate cpu core, but that is to the OS to decide. You can check in your system monitor.
与此同时,您运行的进程会中断,并且您希望停止正在运行的进程,而不是让它们成为孤儿:
In the meantime you run a process that will break, and you want to stop the running processes, not leaving them orphan:
try:
do_other_stuff_for_a_bit()
except MyException as exc:
print(exc)
print("Now stopping all processes...")
for p in procs:
p.terminate()
print("The rest of the process will continue")
如果在一个或所有子进程终止后继续执行主进程没有意义,您应该处理主程序的退出.
If it doesn't make sense to continue with the main process when one or all of the subprocesses have terminated, you should handle the exit of the main program.
希望它有所帮助,您可以为您的图书馆调整其中的一些内容.
Hope it helps, and you can adapt bits of this for your library.
这篇关于在 Python 中停止线程池中的进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!