异步\队列.py

@coroutine
def put(self, item):
    """Put an item into the queue.

    Put an item into the queue. If the queue is full, wait until a free
    slot is available before adding item.

    This method is a coroutine.
    """
    while self.full():
        putter = futures.Future(loop=self._loop)
        self._putters.append(putter)
        try:
            yield from putter
        except:
            putter.cancel()  # Just in case putter is not done yet.
            if not self.full() and not putter.cancelled():
                # We were woken up by get_nowait(), but can't take
                # the call.  Wake up the next in line.
                self._wakeup_next(self._putters)
            raise
    return self.put_nowait(item)

在我看来,putter可以通过cancelset_exceptionset_result来完成。get_nowait使用set_result。只有cancelset_exception会引发异常,然后才会发生except:。我认为不需要except:
为什么要在except:中添加一个Wake up the next in line
更新:@Vincent
_wakeup_next呼叫set_resultset_result将执行self._state = _FINISHEDtask1.cancel()self._fut_waiter.cancel()返回False。因此,任务1不会被取消。
@文森特非常感谢
关键原因是任务。取消可以取消任务,尽管任务正在等待的未来已设置为结果(self.\u state=\u FINISHED)。

最佳答案

如果等待putter的任务被取消,yield from putter将引发CancelledError。调用get_nowait()后可能会发生这种情况,并且您希望确保通知其他推杆在队列中有新的可用插槽。
下面是一个例子:

async def main():
    # Create a full queue
    queue = asyncio.Queue(1)
    await queue.put('A')
    # Schedule two putters as tasks
    task1 = asyncio.ensure_future(queue.put('B'))
    task2 = asyncio.ensure_future(queue.put('C'))
    await asyncio.sleep(0)
    # Make room in the queue, print 'A'
    print(queue.get_nowait())
    # Cancel task 1 before giving the control back to the event loop
    task1.cancel()
    # Thankfully, the putter in task 2 has been notified
    await task2
    # Print 'C'
    print(await queue.get())

编辑:有关内部情况的详细信息:
queue.get_nowait()putter.set_result(None)被调用;推杆状态现在为FINISHED,当控件返回到事件循环时,task1将被唤醒。
task1.cancel()task1._fut_waiter已完成,因此将task1._must_cancel设置为True,以便在下次运行CancelledError时提升atask1
await task2
控制返回到控制回路,然后task1._step()运行。一个CancelledError被抛出到协程中:task1._coro.throw(CancelledError())
queue.put捕获异常。由于队列未满且不会插入'B',因此必须通知队列中的下一个推杆:self._wakeup_next(self._putters)
然后,CancelledError被重新提升并陷入task1._step()中。task1现在实际上取消了自身(super().cancel())。

关于python - 除了asyncio.Queue.put中的错误?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/37019960/

10-12 23:19