Python 3.4.2
我正在学习asyncio,并用它连续监听IPC总线,而gbulb则监听dbus。
一些注意事项:
因此,我创建了一个函数listen_to_ipc_channel_layer
,该函数连续监听IPC channel 上的传入消息,并将消息传递给message_handler
。
我也在听SIGTERM和SIGINT。因此,当我向运行在底部的代码的python进程发送SIGTERM时,脚本应正常终止。
问题
…我收到以下警告:
got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>
Process finished with exit code 0
…具有以下代码:
import asyncio
import gbulb
import signal
import asgi_ipc as asgi
def main():
asyncio.async(listen_to_ipc_channel_layer())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
# Start listening on the Linux IPC bus for incoming messages
loop.run_forever()
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
"""Listens to the Linux IPC bus for messages"""
while True:
message_handler(message=channel_layer.receive(["my_channel"]))
try:
yield from asyncio.sleep(0.1)
except asyncio.CancelledError:
break
def ask_exit():
loop = asyncio.get_event_loop()
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
if __name__ == "__main__":
gbulb.install()
# Connect to the IPC bus
channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
main()
我对异步的理解仍然很少,但是我想我知道发生了什么。在等待
yield from asyncio.sleep(0.1)
时,信号处理程序捕获了SIGTERM,并在此过程中调用task.cancel()
。引发问题:这是否应该触发
CancelledError
循环内的while True:
? (因为不是,但这就是我对"Calling cancel() will throw a CancelledError to the wrapped coroutine"的理解)。最终,调用
loop.stop()
可以停止循环,而无需等待yield from asyncio.sleep(0.1)
返回结果甚至整个协程listen_to_ipc_channel_layer
。如果我错了,请纠正我。
我认为我唯一需要做的就是让我的程序等待
yield from asyncio.sleep(0.1)
返回结果和/或协程打破while循环并完成。我相信我混淆了很多事情。请帮助我弄清楚那些事情,以便我能弄清楚如何优雅地关闭事件循环而不会发出警告。
最佳答案
问题来自取消任务后立即关闭循环。作为cancel() docs state
请使用以下代码段:
import asyncio
import signal
async def pending_doom():
await asyncio.sleep(2)
print(">> Cancelling tasks now")
for task in asyncio.Task.all_tasks():
task.cancel()
print(">> Done cancelling tasks")
asyncio.get_event_loop().stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
async def looping_coro():
print("Executing coroutine")
while True:
try:
await asyncio.sleep(0.25)
except asyncio.CancelledError:
print("Got CancelledError")
break
print("Done waiting")
print("Done executing coroutine")
asyncio.get_event_loop().stop()
def main():
asyncio.async(pending_doom())
asyncio.async(looping_coro())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
# I had to manually remove the handlers to
# avoid an exception on BaseEventLoop.__del__
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)
if __name__ == '__main__':
main()
注意
ask_exit
取消了任务,但没有stop
循环,在下一个周期looping_coro()
停止了它。如果取消输出,则输出为:Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine
注意
pending_doom
如何取消并在之后立即停止循环。如果让它运行直到pending_doom
协程从 sleep 中唤醒,您会看到相同的警告:Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
关于python - 请解释 “Task was destroyed but it is pending!”,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/40897428/