异步\队列.py
@coroutine
def put(self, item):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
This method is a coroutine.
"""
while self.full():
putter = futures.Future(loop=self._loop)
self._putters.append(putter)
try:
yield from putter
except:
putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
在我看来,
putter
可以通过cancel
、set_exception
或set_result
来完成。get_nowait
使用set_result
。只有cancel
和set_exception
会引发异常,然后才会发生except:
。我认为不需要except:
。为什么要在
except:
中添加一个Wake up the next in line
?更新:@Vincent
_wakeup_next
呼叫set_result
。set_result
将执行self._state = _FINISHED
。task1.cancel()
将self._fut_waiter.cancel()
返回False。因此,任务1不会被取消。@文森特非常感谢
关键原因是任务。取消可以取消任务,尽管任务正在等待的未来已设置为结果(self.\u state=\u FINISHED)。
最佳答案
如果等待putter
的任务被取消,yield from putter
将引发CancelledError
。调用get_nowait()
后可能会发生这种情况,并且您希望确保通知其他推杆在队列中有新的可用插槽。
下面是一个例子:
async def main():
# Create a full queue
queue = asyncio.Queue(1)
await queue.put('A')
# Schedule two putters as tasks
task1 = asyncio.ensure_future(queue.put('B'))
task2 = asyncio.ensure_future(queue.put('C'))
await asyncio.sleep(0)
# Make room in the queue, print 'A'
print(queue.get_nowait())
# Cancel task 1 before giving the control back to the event loop
task1.cancel()
# Thankfully, the putter in task 2 has been notified
await task2
# Print 'C'
print(await queue.get())
编辑:有关内部情况的详细信息:
queue.get_nowait()
:putter.set_result(None)
被调用;推杆状态现在为FINISHED
,当控件返回到事件循环时,task1
将被唤醒。task1.cancel()
:task1._fut_waiter
已完成,因此将task1._must_cancel
设置为True
,以便在下次运行CancelledError
时提升atask1
。await task2
:控制返回到控制回路,然后
task1._step()
运行。一个CancelledError
被抛出到协程中:task1._coro.throw(CancelledError())
。queue.put
捕获异常。由于队列未满且不会插入'B'
,因此必须通知队列中的下一个推杆:self._wakeup_next(self._putters)
。然后,
CancelledError
被重新提升并陷入task1._step()
中。task1
现在实际上取消了自身(super().cancel()
)。关于python - 除了asyncio.Queue.put中的错误?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37019960/