本文介绍了如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试创建使用 websockets 的 fastapi 应用程序,并且可以向所有连接的客户端广播消息.我发现 websockets 是不可能的,但发现了一个很棒的库 - socket.io.但是我不确定如何使用它并将其与我现有的 fastapi 应用程序集成.

I tried to create fastapi application that uses websockets and could broadcast messages to all connected clients. I found out that it's not possible with websockets but found briliant library - socket.io. However I am not sure how could I use it and integrate it with my existing fastapi app.

推荐答案

# server.py
from typing import Any

import uvicorn
from fastapi import FastAPI

import socketio

sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()


@app.get("/test")
async def test():
    print("test")
    return "WORKS"


app.mount("/", socket_app)  # Here we mount socket app to main fastapi app


@sio.on("connect")
async def connect(sid, env):
    print("on connect")


@sio.on("direct")
async def direct(sid, msg):
    print(f"direct {msg}")
    await sio.emit("event_name", msg, room=sid)  # we can send message to specific sid


@sio.on("broadcast")
async def broadcast(sid, msg):
    print(f"broadcast {msg}")
    await sio.emit("event_name", msg)  # or send to everyone


@sio.on("disconnect")
async def disconnect(sid):
    print("on disconnect")


if __name__ == "__main__":
    kwargs = {"host": "0.0.0.0", "port": 5000}
    kwargs.update({"debug": True, "reload": True})
    uvicorn.run("server:app", **kwargs)
# client.py
import requests
import socketio

r = requests.get("http://127.0.0.1:5000/test") # server prints "test"
cl = socketio.Client()
cl2 = socketio.Client()


@cl.on("event_name")
def foo(data):
    print(f"client 1 {data}")


@cl2.on("event_name")
def foo2(data):
    print(f"client 2 {data}")


cl.connect("http://127.0.0.1:5000/") # server prints "on connect"
cl2.connect("http://127.0.0.1:5000/")
cl.emit("direct", "msg_1") # prints client 1 msg_1
cl2.emit("broadcast", "msg_2") # prints client 2 msg_2 and client 1 msg_2

最后安装适当的依赖项:

At the end install proper dependencies:

# server.py
pip install python-socketio uvicorn fastapi
# client.py
pip install requests websocket-client

这篇关于如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-27 04:37