问题描述
我有一个websocket服务器(python 3.x)接受请求,每个请求都是一个url变量.它运行得很好,只是它只依次串行地执行每个请求. 该函数正在运行时,它还会阻止尝试连接的客户端.我想要的是非阻止!
I have a websocket server (python 3.x) taking requests where each is a url variable. It runs just fine except it only executes each request in serial, after one another. While the function is running it also blocks the client(s) trying to connect. Non-blocking is what i want!
- websocket和子流程功能的
- 异步线程.
- Asyncronous threading of both websocket and subprocess function.
所以,我对这种挫败感并没有走多远.我回到了原始代码,事实证明,您需要使用await asyncio.sleep(.001)
休眠该函数.现在它运行得非常好,我同时与多个客户端进行了测试,并且可以异步处理它.
So, I didn't get very far with this frustration. I reverted back to my original code and as it turns out, you need to sleep the function with await asyncio.sleep(.001)
. Now it runs perfectly fine, I tested with multiple clients at the same time and it handles it asynchronously.
import asyncio, websockets, json
async def handler(websocket, path):
print("New client connected.")
await websocket.send('CONNECTED')
try:
while True:
inbound = await websocket.recv()
if inbound is None:
break
while inbound != None:
import time
for line in range(10):
time.sleep(1)
data = {}
data['blah'] = line
await asyncio.sleep(.000001) # THIS
print(data)
await websocket.send(json.dumps(data))
await websocket.send(json.dumps({'progress': 'DONE'}))
break
except websockets.exceptions.ConnectionClosed:
print("Client disconnected.")
if __name__ == "__main__":
server = websockets.serve(handler, '0.0.0.0', 8080)
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
更新:,如 @udi 所建议,如果您想要一个缓慢的外部进程,则方法是asyncio.subprocess而不是subprocess.通过阻塞调用从管道读取将使其他线程停滞,这是asyncio.subprocess负责的工作.
Update: as suggested by @udi, if you want a slow external process, the way to go is asyncio.subprocess and not subprocess. Reading from pipe with a blocking call stalls the other threads, which is what asyncio.subprocess takes care of.
推荐答案
time.sleep()
正在阻止.
尝试:
# blocking_server.py
import asyncio
import time
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}".format(client_id, n))
for i in range(1, n + 1):
print("[#{}] zzz...".format(client_id))
time.sleep(1)
print("[#{}] woke up!".format(client_id))
await asyncio.sleep(.001)
msg = "*" * i
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
使用此测试客户端:
# test_client.py
import asyncio
import time
import websockets
async def client(client_id, n):
t0 = time.time()
async with websockets.connect('ws://localhost:8080') as websocket:
print("[#{}] > {}".format(client_id, n))
await websocket.send(str(n))
while True:
resp = await websocket.recv()
print("[#{}] < {}".format(client_id, resp))
if resp == "bye!":
break
print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))
tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
现在比较将time.sleep(x)
替换为await asyncio.sleep(x)
时的结果!
Now compare the result when time.sleep(x)
is replaced with await asyncio.sleep(x)
!
If you need to run a slow external process via asyncio, try asynico.subprocess
:
外部程序示例:
# I am `slow_writer.py`
import sys
import time
n = int(sys.argv[1])
for i in range(1, n + 1):
time.sleep(1)
print("*" * i)
使用此服务器:
# nonblocking_server.py
import asyncio
import sys
import websockets
x = 0
async def handler(websocket, path):
global x
x += 1
client_id = x
try:
print("[#{}] Connected.".format(client_id))
n = int(await websocket.recv())
print("[#{}] Got: {}. Running subprocess..".format(client_id, n))
cmd = (sys.executable, 'slow_writer.py', str(n))
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE)
async for data in proc.stdout:
print("[#{}] got from subprocess, sending: {}".format(
client_id, data))
await websocket.send(data.decode().strip())
return_value = await proc.wait()
print("[#{}] Subprocess done.".format(client_id))
msg = "bye!"
print("[#{}] sending: {}".format(client_id, msg))
await websocket.send(msg)
print("[#{}] Done.".format(client_id, msg))
except websockets.exceptions.ConnectionClosed:
print("[#{}] Disconnected!.".format(client_id))
if __name__ == "__main__":
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
port = 8080
server = websockets.serve(handler, '0.0.0.0', port)
print("Started server on port {}".format(port))
loop = asyncio.get_event_loop()
loop.run_until_complete(server)
loop.run_forever()
这篇关于asyncio:运行一个函数,该函数线程与来自websocket客户端的多个请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!