经理 如果引用中包含标准(非代理)列表或dict对象,则对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时修改了其中包含的值.但是,将值存储在容器代理中(会在代理对象上触发 setitem )会通过管理器传播,因此要有效地修改此类项目,可以将修改后的值重新分配给容器代理...在您的情况下,它看起来像:def worker(xrange, return_dict, lock): """worker function""" for x in range(xrange[0], xrange[1]): y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: list_x = return_dict['x'] list_y = return_dict['y'] list_x.append(x) list_y.append(y) return_dict['x'] = list_x return_dict['y'] = list_y这里的lock将是您必须作为参数传递的manager.Lock实例,因为整个(现在)锁定的操作本身并不是原子的. (此处是Manager使用锁定的简单示例) 在大多数情况下,这种方法可能不如采用嵌套的代理对象方便,但它也展示了对同步的控制级别.由于Python 3.6代理对象是可嵌套的: 在3.6版中进行了更改:共享对象能够嵌套.例如,共享容器对象(例如共享列表)可以包含其他共享对象,所有这些共享对象都将由SyncManager管理和同步.从Python 3.6开始,您可以在使用manager.list作为值开始多处理之前填充manager.dict,然后直接将其追加到工作程序中,而无需重新分配.return_dict['x'] = manager.list()return_dict['y'] = manager.list()这是Manager的完整示例:import timeimport multiprocessing as mpfrom multiprocessing import Manager, Processfrom contextlib import contextmanager# mp_util.py from first link in code-snippet for "Pool"# section belowfrom mp_utils import calc_batch_sizes, build_batch_ranges# def context_timer ... see code snippet in "Pool" section belowdef worker(batch_range, return_dict, lock): """worker function""" for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: return_dict['x'].append(x) return_dict['y'].append(y)if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with Manager() as manager: lock = manager.Lock() return_dict = manager.dict() return_dict['x'] = manager.list() return_dict['y'] = manager.list() tasks = [(batch_range, return_dict, lock) for batch_range in batch_ranges] with context_timer(): pool = [Process(target=worker, args=args) for args in tasks] for p in pool: p.start() for p in pool: p.join() # Create standard container with data from manager before exiting # the manager. result = {k: list(v) for k, v in return_dict.items()} print(result) 游泳池 multiprocessing.Pool通常会执行此操作.在示例中,您还面临其他挑战,因为您希望在一个范围内分布迭代.即使您的chunker函数也无法划分范围,所以每个进程的工作大致相同:chunker((0, 21), 4)# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!对于以下代码,请从我的回答此处中获取mp_utils.py的代码段,它提供了两个功能尽可能大的范围.使用multiprocessing.Pool,您的worker函数仅需返回结果,而Pool将负责将结果通过内部队列传输回父进程. result将是一个列表,因此您将必须以希望的方式重新排列结果.您的示例如下所示:import timeimport multiprocessing as mpfrom multiprocessing import Poolfrom contextlib import contextmanagerfrom itertools import chainfrom mp_utils import calc_batch_sizes, build_batch_ranges@contextmanagerdef context_timer(): start_time = time.perf_counter() yield end_time = time.perf_counter() total_time = end_time-start_time print(f'\nEach iteration took: {total_time / X_MAX:.4f} s') print(f'Total time: {total_time:.4f} s\n')def worker(batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) return resultif __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): with Pool(N_WORKERS) as pool: results = pool.map(worker, iterable=batch_ranges) print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}')示例输出:[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]Condition met at: -15 0Condition met at: -3 1Condition met at: 11 2Each iteration took: 0.0000 sTotal time: 8.2408 sresults: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]results sorted: x: (0, 1, 2), y: (-15, -3, 11)Process finished with exit code 0如果您的worker有多个参数,则可以构建带有参数元组的任务"列表,并与pool.starmap(...iterable=tasks)交换pool.map(...).有关更多详细信息,请参阅文档. 流程和排队如果由于某种原因不能使用multiprocessing.Pool,则必须采取通过以下方式自己照顾进程间通信(IPC):multiprocessing.Queue作为子级中您的工作者功能的参数-流程,让他们排队将其结果发送回父母您还必须构建类似Pool的结构,以便对其进行迭代以启动和加入流程,并且必须从队列中返回get()结果.关于Queue.get使用的更多信息,我已经在此处撰写.采用这种方法的解决方案如下所示:def worker(result_queue, batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) result_queue.put(result) # <--if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 result_queue = mp.Queue() # <-- batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): pool = [Process(target=worker, args=(result_queue, batch_range)) for batch_range in batch_ranges] for p in pool: p.start() results = [result_queue.get() for _ in batch_ranges] for p in pool: p.join() print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}')The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python. from time import timexmax = 100000000start = time()for x in range(xmax): y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x)end = time()tt = end-start #total timeprint('Each iteration took: ', tt/xmax)print('Total time: ', tt)This outputs as expected:Condition met at: -15 0Condition met at: -3 1Condition met at: 11 2Each iteration took: 8.667453265190124e-07Total time: 86.67453265190125As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question. from time import timeimport multiprocessing as mpdef chunker (rng, t): # this functions makes t chunks out of rng L = rng[1] - rng[0] Lr = L % t Lm = L // t h = rng[0]-1 chunks = [] for i in range(0, t): c = [h+1, h + Lm] h += Lm chunks.append(c) chunks[t-1][1] += Lr + 1 return chunksdef worker(lock, xrange, return_dict): '''worker function''' for x in range(xrange[0], xrange[1]): y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) return_dict['x'].append(x) return_dict['y'].append(y) with lock: list_x = return_dict['x'] list_y = return_dict['y'] list_x.append(x) list_y.append(y) return_dict['x'] = list_x return_dict['y'] = list_yif __name__ == '__main__': start = time() manager = mp.Manager() return_dict = manager.dict() lock = manager.Lock() return_dict['x']=manager.list() return_dict['y']=manager.list() xmax = 100000000 nw = mp.cpu_count() workers = list(range(0, nw)) chunks = chunker([0, xmax], nw) jobs = [] for i in workers: p = mp.Process(target=worker, args=(lock, chunks[i],return_dict)) jobs.append(p) p.start() for proc in jobs: proc.join() end = time() tt = end-start #total time print('Each iteration took: ', tt/xmax) print('Total time: ', tt) print(return_dict['x']) print(return_dict['y'])which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.the output I get is:Each iteration took: 1.7742713451385497e-07Total time: 17.742713451385498[][]from which I expect:Each iteration took: 1.7742713451385497e-07Total time: 17.742713451385498[0, 1, 2][-15, -3, 11] 解决方案 The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.The quoting parts are from the multiprocessing docs.Manager If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...In your case this would look like:def worker(xrange, return_dict, lock): """worker function""" for x in range(xrange[0], xrange[1]): y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: list_x = return_dict['x'] list_y = return_dict['y'] list_x.append(x) list_y.append(y) return_dict['x'] = list_x return_dict['y'] = list_yThe lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Hereis an easier example with Manager using Lock) This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.Since Python 3.6 proxy objects are nestable: Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.return_dict['x'] = manager.list()return_dict['y'] = manager.list()EDIT:Here is the full example with Manager:import timeimport multiprocessing as mpfrom multiprocessing import Manager, Processfrom contextlib import contextmanager# mp_util.py from first link in code-snippet for "Pool"# section belowfrom mp_utils import calc_batch_sizes, build_batch_ranges# def context_timer ... see code snippet in "Pool" section belowdef worker(batch_range, return_dict, lock): """worker function""" for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) with lock: return_dict['x'].append(x) return_dict['y'].append(y)if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with Manager() as manager: lock = manager.Lock() return_dict = manager.dict() return_dict['x'] = manager.list() return_dict['y'] = manager.list() tasks = [(batch_range, return_dict, lock) for batch_range in batch_ranges] with context_timer(): pool = [Process(target=worker, args=args) for args in tasks] for p in pool: p.start() for p in pool: p.join() # Create standard container with data from manager before exiting # the manager. result = {k: list(v) for k, v in return_dict.items()} print(result)PoolMost often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.Your chunker function doesn't manage to divide the range even so every process has about the same work to do:chunker((0, 21), 4)# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:import timeimport multiprocessing as mpfrom multiprocessing import Poolfrom contextlib import contextmanagerfrom itertools import chainfrom mp_utils import calc_batch_sizes, build_batch_ranges@contextmanagerdef context_timer(): start_time = time.perf_counter() yield end_time = time.perf_counter() total_time = end_time-start_time print(f'\nEach iteration took: {total_time / X_MAX:.4f} s') print(f'Total time: {total_time:.4f} s\n')def worker(batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) return resultif __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): with Pool(N_WORKERS) as pool: results = pool.map(worker, iterable=batch_ranges) print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}')Example Output:[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000),range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]Condition met at: -15 0Condition met at: -3 1Condition met at: 11 2Each iteration took: 0.0000 sTotal time: 8.2408 sresults: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]results sorted: x: (0, 1, 2), y: (-15, -3, 11)Process finished with exit code 0If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.Process & QueueIf you can't use multiprocessing.Pool for some reason, you have to takecare of inter-process communication (IPC) yourself, by passing amultiprocessing.Queue as argument to your worker-functions in the child-processes and letting them enqueue their results to be send back to theparent.You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.A solution with this approach could look like this:def worker(result_queue, batch_range): """worker function""" result = [] for x in batch_range: y = ((x+5)**2+x-40) if y <= 0xf+1: print('Condition met at: ', y, x) result.append((x, y)) result_queue.put(result) # <--if __name__ == '__main__': N_WORKERS = mp.cpu_count() X_MAX = 100000000 result_queue = mp.Queue() # <-- batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS) batch_ranges = build_batch_ranges(batch_sizes) print(batch_ranges) with context_timer(): pool = [Process(target=worker, args=(result_queue, batch_range)) for batch_range in batch_ranges] for p in pool: p.start() results = [result_queue.get() for _ in batch_ranges] for p in pool: p.join() print(f'results: {results}') x, y = zip(*chain.from_iterable(results)) # filter and sort results print(f'results sorted: x: {x}, y: {y}') 这篇关于如何从在并行进程中运行的函数中检索值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-24 18:22
查看更多