Python基于Quart框架实现SSE数据传输

前言

在类似Chatgpt的应用中要实现数据的流式传输,模仿实现打字机效果,SSE是不二之选。传统的Flask框架不能满足异步处理的要求,没有异步处理就很难实现实时交互的需求,因此全新的Quart框架出现,但是Quart框架并没有原生好用的SSE类,官网只给出了如下的封装
【后端开发实习】Python基于Quart框架实现SSE数据传输-LMLPHP

SSE简介

【后端开发实习】Python基于Quart框架实现SSE数据传输-LMLPHP

Server-Sent Events (SSE) 是一种基于 HTTP 的协议,服务器可以使用它来向客户端推送实时更新。在人工智能(AI)领域,SSE 的重要性主要体现在以下几个方面:

  1. 实时交互:在某些 AI 应用中,如聊天机器人、实时推荐系统等,需要服务器能够实时地向客户端推送新的信息或更新。SSE 提供了一种有效的方式来实现这种实时交互。

  2. 异步处理:AI 模型的计算过程可能会耗费一定的时间,特别是在处理大量数据或复杂模型时。通过 SSE,服务器可以在计算完成后立即将结果推送给客户端,而无需让客户端等待整个计算过程。

  3. 减少网络负载:相比于传统的轮询方式,SSE 可以大大减少网络请求的数量,从而降低服务器的负载。这对于处理大量实时数据的 AI 应用来说尤其重要。

  4. 易于实现和使用:SSE 是基于 HTTP 的,因此在大多数编程语言和框架中都很容易实现。此外,由于它是一种标准协议,因此客户端(如浏览器)通常也提供了对 SSE 的原生支持。

理论分析

Server-Sent Events (SSE) 是一种服务器向客户端推送数据的技术,它基于 HTTP 协议,允许服务器单向地向客户端发送事件。以下是 SSE 的数据传输格式和通信过程原理:

数据传输格式

SSE 使用纯文本格式发送数据,每个事件由一系列以换行符分隔的字段组成。每个字段都以字段名开始,后跟一个冒号和字段值。例如:

data: This is the first message.

data: This is the second message.

这里,data 是一个字段,表示事件的数据。你也可以使用其他字段,如 event(指定事件类型)和 id(指定事件 ID)。

通信过程原理

  1. 建立连接:客户端通过发送一个 GET 请求到服务器上的某个 URL 来建立连接。这个请求的 Accept 头部字段应该设置为 text/event-stream,以告诉服务器客户端希望使用 SSE。

  2. 发送事件:一旦连接建立,服务器就可以开始发送事件。每个事件都是一个独立的消息,由一系列字段组成。服务器可以在任何时候发送事件,不需要客户端的请求。

  3. 接收事件:客户端通过监听 message 事件来接收服务器发送的事件。每当服务器发送一个事件,message 事件就会被触发,事件的数据可以通过事件对象的 data 属性获取。

  4. 断开和重新连接:如果连接被断开,客户端会自动尝试重新连接。你可以通过设置 retry 字段来控制重新连接的时间间隔。如果服务器发送了一个带有 id 字段的事件,那么在重新连接时,客户端会发送一个 Last-Event-ID 头部,值为最后一个接收到的事件的 ID,这样服务器就可以知道客户端接收到哪些事件。

因此需要定义如下四种内容:

  • data:数据内容
  • event:事件类型,一般是message
  • id:事件编号
  • retry:断开重传的时间

代码实现

from dataclasses import dataclass
from quart import make_response, json, Response


@dataclass
class ServerSentEvent:
    """
    Server Sent Event服务器
    服务端作用:
    将文本数据变成数据流传向客户端
    数据格式:
    data: string  传输数据内容, 公有变量
    event: string  传输事件类型, 私有变量
    id: string 事件id, 私有变量
    retry: int 断开重连时间, 私有变量
    """
    _data: str
    _event: str = None
    _id: int = 0
    _retry: int = 0

    def encode(self):
        """
        将数据转换成SSE的传输格式
        """
        message = f"data: {self._data}"
        if self._event is not None:
            message = f"{message}\nevent: {self._event}"
        if self._id is not None:
            message = f"{message}\nid: {self._id}"
        if self._retry is not None:
            message = f"{message}\nretry: {self._retry}"
        message = f"{message}\n\n"
        return message


async def response_sse(chat_generator):
    """
    发送请求的响应
    chat_generator: generator
    return: response
    """
    async def send_events():
        """
        将数据编码成SSE传输的格式进行传输
        """
        # 遍历chat_generator获取其中的字符串内容
        for data in chat_generator:
            print("data in generator:"+data)
            event = ServerSentEvent(data)
            encoded_event = event.encode()
            yield encoded_event

    # 返回响应数据
    response = await make_response(
        send_events(),
        {
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Transfer-Encoding': 'chunked',
            'Connection': 'keep-alive',
        },
    )
    response.timeout = None
    return response

07-28 01:32