几乎每个人都知道,当他们第一次使用Python进行线程处理时,GIL使真正想并行执行处理的人痛苦不堪-或至少给了机会。

我目前正在考虑实现类似Reactor模式的东西。实际上,我想在一个类线程上监听传入的套接字连接,并且当有人尝试连接时,接受该连接并将其传递给另一个类线程进行处理。

我(尚)不确定我可能要面对什么样的负担。我知道目前对传入邮件设置了2MB的上限。从理论上讲,我们每秒可以得到数千个(尽管我不知道实际上是否已经看到过类似的东西)。处理一条消息所花费的时间并不是很重要,尽管显然更快会更好。

我正在研究Reactor模式,并使用multiprocessing库开发了一个小示例(至少在测试中)似乎很好。但是,现在/很快,我们将提供asyncio库,该库将为我处理事件循环。

有什么可以通过结合asynciomultiprocessing咬我的?

最佳答案

尽管您不应该直接使用asyncio,但您应该能够安全地组合multiprocessingmultiprocessing,而不会遇到太多麻烦。 asyncio(以及任何其他基于事件循环的异步框架)的主要缺点是阻塞了事件循环。如果尝试直接使用multiprocessing,则每次阻止等待子进程时,都将阻止事件循环。显然,这很糟糕。

避免这种情况的最简单方法是使用 BaseEventLoop.run_in_executor concurrent.futures.ProcessPoolExecutor 中执行函数。 ProcessPoolExecutor是使用multiprocessing.Process实现的进程池,但是asyncio内置支持在其中执行功能而不会阻塞事件循环。这是一个简单的例子:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

对于大多数情况,仅此功能就足够了。如果您发现自己需要multiprocessing中的其他构造(例如QueueEventManager等),则有一个名为 aioprocessing 的第三方库(完整披露:我已经写过),该库提供了asyncio兼容的所有multiprocessing数据结构的版本。这是一个演示示例:
import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

关于python - 异步与多处理相结合会出现什么样的问题(如果有的话)?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48035079/

10-12 18:54