本文介绍了如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我尝试创建使用 websockets 的 fastapi 应用程序,并且可以向所有连接的客户端广播消息.我发现 websockets 是不可能的,但发现了一个很棒的库 - socket.io.但是我不确定如何使用它并将其与我现有的 fastapi 应用程序集成.
I tried to create fastapi application that uses websockets and could broadcast messages to all connected clients. I found out that it's not possible with websockets but found briliant library - socket.io. However I am not sure how could I use it and integrate it with my existing fastapi app.
推荐答案
# server.py
from typing import Any
import uvicorn
from fastapi import FastAPI
import socketio
sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()
@app.get("/test")
async def test():
print("test")
return "WORKS"
app.mount("/", socket_app) # Here we mount socket app to main fastapi app
@sio.on("connect")
async def connect(sid, env):
print("on connect")
@sio.on("direct")
async def direct(sid, msg):
print(f"direct {msg}")
await sio.emit("event_name", msg, room=sid) # we can send message to specific sid
@sio.on("broadcast")
async def broadcast(sid, msg):
print(f"broadcast {msg}")
await sio.emit("event_name", msg) # or send to everyone
@sio.on("disconnect")
async def disconnect(sid):
print("on disconnect")
if __name__ == "__main__":
kwargs = {"host": "0.0.0.0", "port": 5000}
kwargs.update({"debug": True, "reload": True})
uvicorn.run("server:app", **kwargs)
# client.py
import requests
import socketio
r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()
@cl.on("event_name")
def foo(data):
print(f"client 1 {data}")
@cl2.on("event_name")
def foo2(data):
print(f"client 2 {data}")
cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2
最后安装适当的依赖项:
At the end install proper dependencies:
# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client
这篇关于如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!