在 Python 异步协程中使用同步队列
使用 Python asyncio 进行异步编程时,如果需要在协程间交互数据可以使用异步队列 asyncio.Queue。但 asyncio.Queue 不是线程安全的,如果需要在不同线程间的异步程序之间或者不同线程的异步程序和同步程序间交换数据,就需要使用 queue 模块中的 Queue 这个队列,因为它是线程安全的。如果队列数据的消费者是 asyncio 异步协程,使用 queue.Queue 时,就必须注意,以免引起协程的阻塞。下面,我们用程序实例来说明这个问题。
先看一个简单的并发协程程序:
import asyncio
import queue
class TestAsync:
def __init__(self):
self.running_flag = False
self.reader = None
self.writer = None
async def function(self, display_string):
n = 0
while self.running_flag:
print(f"{display_string} count = {n}")
n += 1
await asyncio.sleep(4)
async def stop_tasks(self):
await asyncio.sleep(20)
self.running_flag = False
print("tasks stopped!")
async def start(self):
self.running_flag = True
tsk1 = asyncio.create_task(self.function("task 1"))
tsk2 = asyncio.create_task(self.function("task 2"))
tsk3 = asyncio.create_task(self.stop_tasks())
await asyncio.gather(tsk1, tsk2, tsk3)
if __name__ == "__main__":
cl = TestAsync()
asyncio.run(cl.start())
这个程序被封装成一个类,入口是异步函数 start,它启动三个并发的异步任务:其中两个是从异步函数 function 生成的,它们各自每隔 4 秒打印一次参数中的任务字符串和执行的次数;另一个是终止函数,启动后 20 秒通过将成员变量 running_flag 置成 False 来终止上述两个任务,自己也随后退出。下面是这个程序在 Pycharm 中运行后打印的结果:
task 1 count = 0
task 2 count = 0
task 1 count = 1
task 2 count = 1
task 1 count = 2
task 2 count = 2
task 1 count = 3
task 2 count = 3
task 1 count = 4
task 2 count = 4
tasks stopped!
进程已结束,退出代码0
现在,我们想在另一个线程中通过发送队列消息的方式停止这个程序,于是我们在程序中增加一个全局变量(队列)stop_queue = queue.Queue(),并修改 stop_tasks,收到来自这个队列的消息后就设置 running_flag 为 False,停止所有任务。
此外,我们编写了一个同步函数 stop_async_tasks,这个函数执行 20 秒后向 stop_queue 发送一条消息。
最后,用启动上述异步任务的程序(run_async_tasks) 和 stop_async_tasks 生成两个线程,并发执行这两个线程,预期得到与上面程序执行的同样结果。
下面是修改后的程序:
import asyncio
import threading
import queue
import time
# 增加的全局变量
stop_queue = queue.Queue()
class TestAsync:
def __init__(self):
self.running_flag = False
self.reader = None
self.writer = None
async def function(self, display_string):
n = 0
while self.running_flag:
print(f"{display_string} count = {n}")
n += 1
await asyncio.sleep(4)
# 这个函数做了修改,从队列中收到消息后设置running_flag, 退出程序。
async def stop_tasks(self):
p = stop_queue.get()
self.running_flag = False
print(f"message {p} received, tasks stopped!")
async def start(self):
self.running_flag = True
tsk1 = asyncio.create_task(self.function("task 1"))
tsk2 = asyncio.create_task(self.function("task 2"))
tsk3 = asyncio.create_task(self.stop_tasks())
await asyncio.gather(tsk1, tsk2, tsk3)
# 启动任务线程使用的函数
def run_async_tasks():
cl = TestAsync()
asyncio.run(cl.start())
# 停止任务线程使用的函数
def stop_async_tasks():
time.sleep(20)
stop_queue.put("stop")
if __name__ == "__main__":
# 启动任务的线程
t1 = threading.Thread(target=run_async_tasks)
t1.daemon = True
t1.start()
# 停止任务的线程
t2 = threading.Thread(target=stop_async_tasks)
t2.daemon = True
t2.start()
t1.join()
执行这段程序,打印结果如下:
task 1 count = 0
task 2 count = 0
message stop received, tasks stopped!
进程已结束,退出代码0
这个结果和预期的不同,两个任务各只打印了一次,然后,时间到了20秒,程序就退出了。
造成这个结果的原因是异步函数 stop_tasks 中,从队列中获取消息(p = stop_queue.get())的操作。虽然表面上看,它是在一个异步任务中进行的,但实际上它阻塞的不仅仅是这个异步任务,而是包括所有任务的整个线程,tsk1 和 tsk2 的执行被阻塞,直到 tsk3 收到队列消息,将 running_flag 置为 False,tsk1 和 tsk2 随即退出,后面的打印自然就无法进行了。
解决这一问题的方法是将线程的等待转换为异步的等待,我们可以用两种方式进行这种转换。第一种是将同步等待转换为异步操作,详细情况可以参见我的另一篇博客:Python 异步程序和同步程序的交互;这里要介绍的是第二种方法,它比第一种方法更为简单。在前面第二个程序中,修改 stop_tasks 程序(其他部分均无须改动)如下:
async def stop_tasks(self):
while stop_queue.empty():
await asyncio.sleep(0.2)
p = stop_queue.get()
self.running_flag = False
print(f"message {p} received, tasks stopped!")
由于 Queue.empty() 不涉及等待,所以,这段程序就将 get 操作的等待变为异步的延时等待,直至队列中有消息才退出等待,获取消息后进行终止操作。由于 asyncio.sleep 只阻塞当前任务,所以它不会影响 tsk1 和 tsk2 的执行。
程序运行的结果如下:
task 1 count = 0
task 2 count = 0
task 2 count = 1
task 1 count = 1
task 1 count = 2
task 2 count = 2
task 2 count = 3
task 1 count = 3
task 2 count = 4
task 1 count = 4
task 1 count = 5
task 2 count = 5
message stop received, tasks stopped!
进程已结束,退出代码0
结果与第一个程序相仿,打印 了6 次而不是 5 次的原因是线程的启动及操作需要更多的时间,由此造成了延时的误差。
在并发的异步程序中,发生这类问题的原因往往是错用了同步程序中有关阻塞的操作,例如,在该用 asynio.sleep 的地方错用了 time.sleep,也会引起整个线程中并发协程的阻塞。