Python 3.4.2

我正在学习asyncio,并用它连续监听IPC总线,而gbulb则监听dbus。

一些注意事项:

因此,我创建了一个函数listen_to_ipc_channel_layer,该函数连续监听IPC channel 上的传入消息,并将消息传递给message_handler

我也在听SIGTERM和SIGINT。因此,当我向运行在底部的代码的python进程发送SIGTERM时,脚本应正常终止。

问题

…我收到以下警告:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

…具有以下代码:
import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

我对异步的理解仍然很少,但是我想我知道发生了什么。在等待yield from asyncio.sleep(0.1)时,信号处理程序捕获了SIGTERM,并在此过程中调用task.cancel()

引发问题:这是否应该触发CancelledError循环内的while True:(因为不是,但这就是我对"Calling cancel() will throw a CancelledError to the wrapped coroutine"的理解)。

最终,调用loop.stop()可以停止循环,而无需等待yield from asyncio.sleep(0.1)返回结果甚至整个协程listen_to_ipc_channel_layer

如果我错了,请纠正我。

我认为我唯一需要做的就是让我的程序等待yield from asyncio.sleep(0.1)返回结果和/或协程打破while循环并完成。

我相信我混淆了很多事情。请帮助我弄清楚那些事情,以便我能弄清楚如何优雅地关闭事件循环而不会发出警告。

最佳答案

问题来自取消任务后立即关闭循环。作为cancel() docs state



请使用以下代码段:

import asyncio
import signal


async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()


def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()


async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()


def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)


if __name__ == '__main__':
    main()

注意ask_exit取消了任务,但没有stop循环,在下一个周期looping_coro()停止了它。如果取消输出,则输出为:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine

注意pending_doom如何取消并在之后立即停止循环。如果让它运行直到pending_doom协程从 sleep 中唤醒,您会看到相同的警告:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>

关于python - 请解释 “Task was destroyed but it is pending!”,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40897428/

10-13 02:48