考虑以下程序。

import asyncio
import multiprocessing
from multiprocessing import Queue
from concurrent.futures.thread import ThreadPoolExecutor
import sys


def main():
    executor = ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    # comment the following line and the shutdown will work smoothly
    asyncio.ensure_future(print_some(executor))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print("shutting down")
        executor.shutdown()
        loop.stop()
        loop.close()
        sys.exit()


async def print_some(executor):
    print("Waiting...Hit CTRL+C to abort")
    queue = Queue()
    loop = asyncio.get_event_loop()
    some = await loop.run_in_executor(executor, queue.get)
    print(some)


if __name__ == '__main__':
    main()

我想要的只是在我按“CTRL + C”时正常关机。但是,执行程序线程似乎可以防止这种情况发生(即使我确实调用了shutdown)

最佳答案

您需要发送毒药以使工作人员停止在queue.get调用上收听。如果ThreadPoolExecutor池中的工作线程有活跃的工作,它们将阻止Python退出。有一个comment in the source code描述了此行为的原因:

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
#   - The workers would still be running during interpreter shutdown,
#     meaning that they would fail in unpredictable ways.
#   - The workers could be killed while evaluating a work item, which could
#     be bad if the callable being evaluated has external side-effects e.g.
#     writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

这是一个完整的示例,可以干净地退出:
import asyncio
import multiprocessing
from multiprocessing import Queue
from concurrent.futures.thread import ThreadPoolExecutor
import sys


def main():
    executor = ThreadPoolExecutor()
    loop = asyncio.get_event_loop()
    # comment the following line and the shutdown will work smoothly
    fut = asyncio.ensure_future(print_some(executor))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print("shutting down")
        queue.put(None)  # Poison pill
        loop.run_until_complete(fut)
        executor.shutdown()
        loop.stop()
        loop.close()


async def print_some(executor):
    print("Waiting...Hit CTRL+C to abort")
    loop = asyncio.get_event_loop()
    some = await loop.run_in_executor(executor, queue.get)
    print(some)


queue = None
if __name__ == '__main__':
    queue = Queue()
    main()

需要使用run_until_complete(fut)来避免在asyncio eventloop退出时有关挂起的待处理任务的警告。如果您不在乎,可以将其忽略。

关于python - 如何使用事件循环和执行程序关闭进程,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52081033/

10-13 07:23
查看更多