我试图让异步工作与子流程和限制。我已经以功能性方式完成了此任务,但是当我尝试以opp风格实现相同的逻辑时,出现了一些问题。通常不能腌制协程/发电机错误。我追踪了其中一些,但不是全部

import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
from random import randint

class async_runner(object):
    def __init__(self):
        self.futures = [] # container to store current futures
        self.futures_total = []
        self.loop = asyncio.get_event_loop() # main event_loop
        self.executor = ProcessPoolExecutor()
        self.limit = 1

    def run(self, func, *args):
        temp_loop = asyncio.new_event_loop()
        try:
            coro = func(*args)
            asyncio.set_event_loop(temp_loop)
            ret = temp_loop.run_until_complete(coro)
            return ret
        finally:
            temp_loop.close()
    def limit_futures(self, futures, limit):
        self.futures_total = iter(futures)
        self.futures = [future for future in islice(self.futures_total,0,limit)]
        async def first_to_finish():
            while True:
                await asyncio.sleep(0)
                for f in self.futures:
                    if f.done(): # here raised TypeError: can't pickle coroutine objects
                        print(f.done())
                        self.futures.remove(f)
                        try:
                            #newf = next(self.futures_total)
                            #self.futures.append(newf)
                            print(f.done())
                        except StopIteration as e:
                            pass
                        return f.result()
        while len(self.futures) > 0:
            yield first_to_finish()
    async def run_limited(self, func, args, limit):
        self.limit = int(limit)
        self.futures_total = (self.loop.run_in_executor(self.executor, self.run, func, x) for x in range(110000,119990))
        for ret in self.limit_futures(self.futures_total, 4): # limitation - 4 per all processes
            await ret
    def set_execution(self, func, args, limit):
        ret = self.loop.run_until_complete(self.run_limited(func, args, limit))
        return ret
async def asy(x):
    print('enter: ', x)
    await asyncio.sleep(randint(1,3))
    print('finishing ', x)
    return x

runner = async_runner()
ret = runner.set_execution(asy,urls,2)
print(ret)

但这很好用:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from itertools import islice
import time

async def asy(x):
    print('enter: ', x)
    await asyncio.sleep(1)
    print('finishing ', x)
    return x

def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        ret = loop.run_until_complete(coro)
        #print(ret)
        return ret
    finally:
        loop.close()
def limit_futures(futures, limit):
    futures_sl = [
        c for c in islice(futures, 0, limit)
    ]
    print(len(futures_sl))
    async def first_to_finish(futures):
        while True:
            await asyncio.sleep(0)
            for f in futures_sl:
                if f.done():
                    futures_sl.remove(f)
                    try:
                        newf = next(futures)
                        futures_sl.append(newf)
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures_sl) > 0:
        yield first_to_finish(futures)
async def main():
    loop = asyncio.get_event_loop()
    executor = ProcessPoolExecutor()
    futures = (loop.run_in_executor(executor, run, asy, x) for x in range(110000,119990))
    '''
    CASE balls to the wall!
    await asyncio.gather(*futures)
    '''
    for ret in limit_futures(futures, 4): # limitation - 4 per all processes
        await ret

if __name__ == '__main__':
    start = time.time()
    '''
    # CASE single
    ret = [asy(x) for x in range(510000,510040)]
    exit()
    '''
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("Elapsed time: {:.3f} sec".format(time.time() - start))

我无法理解为什么多处理模块仅在使用对象时才尝试腌制任何东西,而在任何情况下都不尝试。

最佳答案

多处理需要腌制async_runner实例的原因是因为self.runner是绑定(bind)方法,这意味着它“包含”了async_runner实例。

由于您实际上并没有在self方法中使用run,因此可以将其设为staticmethod来避免此问题。

关于python - 在类中使用ProcessPoolExecutor时无法腌制协程对象,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47662040/

10-12 05:31