浏览器EventSource

EventSource基本介绍

  • EventSource维持一个可以持续接收数据的HTTP长连接
  • EventSource接收文本编码的流数据,在接收到结束符之前可以一直接收数据
  • EventSource在接收到结束符后会在一定间隔后自动轮询(或称重试)
  • EventSource接收文本数据
  • EventSource对象被创建则自动开始轮询,每次轮询都在上一次轮询响应完成后
  • EventSource对象每次轮询服务器都会触发onopenonmessage,但一次长连接只触发一次onopen,在当次长连接期间每次接收数据都会触发onmessage
  • EventSource需要浏览器主动调用EventSource.close()才会结束轮询(重试)
  • EventSource重试频率只能由服务端控制,retry:5000\ndata:数据\n\n表示在5000毫秒后再次轮询,retry:毫秒数\n是可选的,默认3秒
  • EventSource连接时异常触发onerror,并立刻重试,不受retry间隔影响

EventSource对象

构造函数

var sse = EventSource(url,configuration);
  • url: 它代表远程资源的位置的url字符串
  • configuration: (可选)配置JSON=> withCredentials:默认为false,指示 CORS 是否应包含凭据(credentials)
  • 返回值: 一个新建的 EventSource 对象

返回的EventSource对象包含的属性:

  • onerror: 错误事件函数,入参为事件对象
  • onmessage: 收到消息事件函数,入参为事件对象,返回的数据在对象的data属性中
  • onopen: 连接建立并打开事件函数,入参为事件对象
  • readyState: (只读)表连接状态,可能值是 CONNECTING (0), OPEN (1), 或者 CLOSED (2)
  • url: (只读)事件源的URL

事件接收器

  • EventSource.onerror: 是一个 EventHandler,当发生错误时被调用,并且在此对象上派发 error 事件。
  • EventSource.onmessage: 是一个 EventHandler,当收到一个 message 事件,即消息来自源头时被调用。
  • EventSource.onopen: 是一个 EventHandler,当收到一个 open 事件,即连接刚打开时被调用。

onmessage入参对象关键属性:

  • data:当次消息数据
  • lastEventId:当次消息ID
  • type:正常情况下都为message

方法

  • EventSource.close(): 如果存在,则关闭连接,并且设置 readyState 属性为 CLOSED。如果连接已经被关闭,此方法不会再进行任何操作。

EventSource简单使用

//EventSource简单使用示例(利用重试机制不停轮询)
window.sseCount = 0;
var sse = new EventSource("/sse");
sse.onmessage = function (e) {
	document.getElementById("s1").innerText=
		new Date(Number(e.data)).toLocaleString();
	++window.sseCount>=20&&sse.close()
}

//EventSource对应的java服务端event-stream
@RequestMapping(value="/sse",produces= MediaType.TEXT_EVENT_STREAM_VALUE)
public String sse(){
	return "data:" + new Date().getTime() + "\n\n";
}

服务端event-stream

服务端event-stream基本介绍

对于服务器而言接收到EventSource的请求和接收到的常规HTTP请求没有什么不同,服务端每次收到EventSource都是一个全新的Request

  • 响应时的Content-Typetext/event-stream
  • 响应必须编码成utf-8等文本格式
  • 响应的体的数据必须以data:开始,且一次轮询的响应体必须以\n\n结束,否则不能触发onmessage
  • 如果需要控制浏览器轮询频率则响应报文前加上retry:毫秒数\n
  • 如果需要给消息唯一的表示则响应报文前加上id:字符串\n
  • 在消息体中\n表示当前类型的内容结束但消息体未结束,而\n\n表示消息体结束

EventSource接收的消息格式:

  • 数据:(必须)data:数据内容\n,对应onmessage入参对象的data属性
  • 事件id(可选,默认空):id:事件ID字符串\n,对应onmessage入参对象的lastEventId属性
  • 频率(可选,默认3秒):retry:毫秒数\n,浏览器在接收到响应的制定毫秒数后重新轮询
  • 结束标识(必须):\n,即消息体最后必须以\n\n结束
  • 完整响应文本格式:retry:毫秒数\nid:ID字符串\ndata:DATA_CONTENT\n\n
  • 样例: retry:1000\nid:123\ndata:woshishuju\n\n对应的消息为:重试间隔1秒,当次消息ID为123,当次消息的数据为woshishuju

浏览器js代码

window.sseCount = 0;
var sse = new EventSource("/test/sse");
sse.onmessage = function (e) {
	console.log(e)
	var newElement = document.createElement("li");
	newElement.textContent = "id: " + e.lastEventId +
		" , message: " + e.data + " , format: " +
		new Date(Number(e.data)).toLocaleString();
	++window.sseCount>=5&&sse.close();
	var ul = document.querySelector('ul');
	if(ul==null){
		ul = document.createElement("ul");
		document.body.appendChild(ul);
	}
	ul.appendChild(newElement);
}
/*
页面显示的消息是:
id: 13053 , message: 1614758013053 , format: 2021/3/3 下午3:53:33
id: 14073 , message: 1614758014073 , format: 2021/3/3 下午3:53:34
id: 15090 , message: 1614758015090 , format: 2021/3/3 下午3:53:35
id: 16114 , message: 1614758016114 , format: 2021/3/3 下午3:53:36
id: 17126 , message: 1614758017126 , format: 2021/3/3 下午3:53:37
*/

服务端java代码

@RequestMapping(value="/test/sse",produces= MediaType.TEXT_EVENT_STREAM_VALUE)
public String sse(){
	System.out.println(Thread.currentThread().getName());
	long d = new Date().getTime();
	return "retry:1000\nid:"+(d%100000)+"\ndata:" + d + "\n\n";
}
//这个方法被EventSource每秒轮询一次(retry:1000)
//当这个方法被EventSource轮询时会打印输出日志如下:
/*
http-nio-8080-exec-2
http-nio-8080-exec-3
http-nio-8080-exec-4
http-nio-8080-exec-5
http-nio-8080-exec-7
http-nio-8080-exec-6
http-nio-8080-exec-8
http-nio-8080-exec-9
......
*/
//则证明每次被EventSource轮询时都是独立的HTTP请求

对应的每次EventSource轮询请求报文

GET /test/sse HTTP/1.1
Host: 127.0.0.1:8080
Connection: keep-alive
Accept: text/event-stream
Cache-Control: no-cache
Last-Event-ID: 85413
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36 Edg/88.0.705.81
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: cors
Sec-Fetch-Dest: empty
Referer: http://127.0.0.1:8080/
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6

对应的每次EventSource轮询响应标头

HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Content-Length: 40
Date: Wed, 03 Mar 2021 07:41:26 GMT
Keep-Alive: timeout=60
Connection: keep-alive

WebAsyncTask实现服务端推送

浏览器端 js 代码

//订阅
function sub(){
    var sse = new EventSource("/sseStream?id=haha");
    sse.onmessage = function (e) {
        console.log(e)
        var newElement = document.createElement("li");
        newElement.textContent = "id: " + e.lastEventId +
            " , message: " + e.data + " , format: " +
            new Date(Number(e.data)).toLocaleString();
        var ul = document.querySelector('ul');
        if(ul==null){
            ul = document.createElement("ul");
            document.body.appendChild(ul);
        }
        ul.appendChild(newElement);
        //接收到约定的结束标识则不再轮询,此时可能会因为延迟而轮询一次
        if("over"===e.data)sse.close();
    }
}
//发送get请求
function get(url){
    var xhr = new XMLHttpRequest();
    xhr.open("GET",url);
    xhr.send("");
}
//模拟推送
function push(){
    var i;
    for(i=0;i<5;++i){
        setTimeout(function (){
            get("/push?id=haha&content="+new Date().getTime());
        },1000*i);
    }
    setTimeout(function (){
        get("/over?id=haha");
    },1000*(i+1));
}

//测试
sub();
setTimeout(function(){
    push();
},1000);

服务端 java 代码

import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.WebAsyncTask;

import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;

@RestController
public class TestController {
    // 新建一个容器,保存连接,用于输出流
    private Map<String, PrintWriter> responseMap = new ConcurrentHashMap<>();
    // 发送数据给客户端
    private void writeData(String id, String msg, boolean over) throws Exception {
        PrintWriter writer = responseMap.get(id);
        if (writer == null) {
            return;
        }
        if(msg==null)msg="";
        writer.println("data:"+msg+"\n");
        writer.flush();
        if (over) {
            writer.println("data:"+msg+"\n\n");
            writer.flush();
            writer.close();
            responseMap.remove(id);
        }
    }

    //订阅消息
    @RequestMapping("/sseStream")
    public WebAsyncTask<Void> sseStream(String id,final HttpServletResponse response){
        response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
        Callable<Void> callable = () -> {
            responseMap.put(id, response.getWriter());
            writeData(id, "订阅成功", false);
            while (true) {
                Thread.sleep(1000);
                if (!responseMap.containsKey(id)) {
                    break;
                }
            }
            return null;
        };

        // 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
        WebAsyncTask<Void> webAsyncTask = new WebAsyncTask<>(30000, callable);
        // 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
        webAsyncTask.onCompletion(() -> System.out.println("程序[正常执行]完成的回调"));

        webAsyncTask.onTimeout(() -> {
            responseMap.remove(id);
            System.out.println("超时了!!!");
            return null;
        });

        webAsyncTask.onError(() -> {
            System.out.println("出现异常!!!");
            return null;
        });

        return webAsyncTask;
    }

    //模拟消息推送
    @ResponseBody
    @GetMapping(path = "push")
    public String pushData(String id, String content) throws Exception {
        writeData(id, content, false);
        return "over!";
    }

    //模拟结束当次推送
    @ResponseBody
    @GetMapping(path = "over")
    public String over(String id) throws Exception {
        writeData(id, "over", true);
        return "over!";
    }

}

利用SseEmitter快速实现服务端推送

前端浏览器 js 代码

//向页面写接收到的数据
function writeSseLog(e){
    var newElement = document.createElement("li");
    newElement.textContent = "message: " + e.data;
    var ul = document.querySelector('ul');
    if(ul==null){
        ul = document.createElement("ul");
        document.body.appendChild(ul);
    }
    ul.appendChild(newElement);
}
//订阅消息
function sseSub(id){
    window["sseSubId"+id]=null;
    var sse = new EventSource("/sse/subscribe?id="+id);
    sse.onmessage=function (e){
        if(window["sseSubId"+id]=='over')sse.close();
        writeSseLog(e);
    }
}
//订阅者1
function sseSub1(){
    sseSub("1");
}
//订阅者2
function sseSub2(){
    sseSub("2");
}
//发送get请求
function get(url){
    var xhr = new XMLHttpRequest();
    xhr.open("GET",url);
    xhr.send("");
}
//发布消息
function ssePush(id,txt){
    var url;
    if(id!=null){
        url = "/sse/push?id="+id+"&content="+txt;
    }else{
        url = "/sse/pushAll?content="+txt;
    }
    get(url);
}
//向订阅者1发布
function ssePush1() {
    ssePush("1","1-"+new Date().toLocaleString());
}
//向订阅者2发布
function ssePush2() {
    ssePush("2","2-"+new Date().toLocaleString());
}
//向全部订阅者发布
function ssePushAll() {
    ssePush(null,"A-"+new Date().toLocaleString());
}
//结束订阅并不再轮询
function sseOver(id){
    window["sseSubId"+id]='over';
    get("/sse/over?id="+id);
}
//结束全部订阅
function sseOverAll(){
    sseOver("1");
    sseOver("2");
}

//测试
sseSub1();
sseSub2();
window.pushCount=0;
for(var i=0;i<6;++i){
    setTimeout(function(){
        ssePush1();
        ssePush2();
        ++window.pushCount%2==0&&ssePushAll();
        window.pushCount==6&&sseOverAll();
    },1000*i);
}

后端 java 代码

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;

@RestController
@RequestMapping("/sse")
public class SseController {
    private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();

    /**订阅*/
    @GetMapping("/subscribe")
    public SseEmitter subscribe(final String id){
        var sseM = new SseEmitter(1000*60*60L);//超时时间1小时
        sseCache.put(id, sseM);//放入缓存
        sseM.onTimeout(()->{
            sseCache.remove(id);
            System.out.println("over time!");
        });//超时从缓存删除
        sseM.onCompletion(()-> {
            sseCache.remove(id);
            System.out.println("success over!");
        });//完成从缓存删除
        sseM.onError(throwable-> {
            System.out.println("error! "+id);
            throwable.printStackTrace();
        });//发生错误时打印错误
        return sseM;
    }

    /**消息发布到指定接收者*/
    @GetMapping("/push")
    public String push(final String id, String content) throws IOException {
        var sseM = sseCache.get(id);
        if(sseM!=null)sseM.send(content);
        return "over";
    }

    /**断开订阅*/
    @GetMapping("/over")
    public String over(final String id){
        var sseM = sseCache.get(id);
        if(sseM!=null)sseM.complete();
        return "over";
    }

    /**广播发布*/
    @GetMapping("/pushAll")
    public String pushAll(String content){
        sseCache.keySet().forEach(k->{
            try {
                sseCache.get(k).send(content);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        return "over";
    }
}

利用WebFlux更快实现服务端推送

前端浏览器 js 代码

var sse = new EventSource("/sse/subscribe");
sse.onmessage = function (e){
    var ul = document.querySelector("ul");
    if(ul==null){
        ul = document.createElement("ul");
        document.body.appendChild(ul);
    }
    var li = document.createElement("li");
    li.textContent = e.data;
    ul.appendChild(li);
}

服务端 java 代码

package com.example.wefluxdemo.web;

import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

import java.time.Duration;
import java.util.Date;

@RequestMapping("/sse")
@RestController
public class SseController {

    @GetMapping(value = "/subscribe",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> subscribe(){
        return Flux.interval(Duration.ofSeconds(1))
                .map(seq -> Tuples.of(seq, currTime()))
                .map(data -> ServerSentEvent.<String>builder()
                        .id(Long.toString(data.getT1()))  //为每次发送设置一个id
                        .data(data.getT2().toString())
                        .build());
    }

    private String currTime(){
        return String.valueOf(new Date().getTime());
    }
}
03-08 17:51