我想将作业从线程提交到asyncio事件循环(就像run_in_executor一样,但反之亦然)。

这是asyncio文档关于concurrency and multithreading的内容:



效果很好,但是协程的结果丢失了。

相反,可以使用向async(或ensure_future)返回的 future 添加完成的回调的函数,以便线程可以通过concurrent.futures.Future访问结果。

为什么没有在标准库中实现这种功能的特殊原因?还是我错过了一种更简单的方法来实现这一目标?

最佳答案

我的请求成功了,并实现了run_coroutine_threadsafe函数here

例子:

def target(loop, timeout=None):
    future = asyncio.run_coroutine_threadsafe(add(1, b=2), loop)
    return future.result(timeout)

async def add(a, b):
    await asyncio.sleep(1)
    return a + b

loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, target, loop)
assert loop.run_until_complete(future) == 3

我最初发布了concurrent.futures.Executor的子类,但仍可以将其实现为:
class LoopExecutor(concurrent.futures.Executor):
    """An Executor subclass that uses an event loop
    to execute calls asynchronously."""

    def __init__(self, loop=None):
        """Initialize the executor with a given loop."""
        self.loop = loop or asyncio.get_event_loop()

    def submit(self, fn, *args, **kwargs):
        """Schedule the callable, fn, to be executed as fn(*args **kwargs).
        Return a Future object representing the execution of the callable."""
        coro = asyncio.coroutine(fn)(*args, **kwargs)
        return asyncio.run_coroutine_threadsafe(coro, self.loop)

关于python - 提交作业到异步事件循环,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/32586794/

10-12 20:06