浏览器EventSource
EventSource基本介绍
- EventSource维持一个可以持续接收数据的HTTP长连接
- EventSource接收文本编码的流数据,在接收到结束符之前可以一直接收数据
- EventSource在接收到结束符后会在一定间隔后自动轮询(或称重试)
- EventSource接收文本数据
- EventSource对象被创建则自动开始轮询,每次轮询都在上一次轮询响应完成后
- EventSource对象每次轮询服务器都会触发
onopen
和onmessage
,但一次长连接只触发一次onopen
,在当次长连接期间每次接收数据都会触发onmessage
- EventSource需要浏览器主动调用
EventSource.close()
才会结束轮询(重试) - EventSource重试频率只能由服务端控制,
retry:5000\ndata:数据\n\n
表示在5000毫秒后再次轮询,retry:毫秒数\n
是可选的,默认3秒 - EventSource连接时异常触发
onerror
,并立刻重试,不受retry
间隔影响
EventSource对象
构造函数
var sse = EventSource(url,configuration);
url
: 它代表远程资源的位置的url字符串configuration
: (可选)配置JSON=>withCredentials
:默认为false
,指示 CORS 是否应包含凭据(credentials)- 返回值: 一个新建的 EventSource 对象
返回的EventSource对象包含的属性:
onerror
: 错误事件函数,入参为事件对象onmessage
: 收到消息事件函数,入参为事件对象,返回的数据在对象的data
属性中onopen
: 连接建立并打开事件函数,入参为事件对象readyState
: (只读)表连接状态,可能值是 CONNECTING (0
), OPEN (1
), 或者 CLOSED (2
)url
: (只读)事件源的URL
事件接收器
EventSource.onerror
: 是一个 EventHandler,当发生错误时被调用,并且在此对象上派发 error 事件。EventSource.onmessage
: 是一个 EventHandler,当收到一个 message 事件,即消息来自源头时被调用。EventSource.onopen
: 是一个 EventHandler,当收到一个 open 事件,即连接刚打开时被调用。
onmessage入参对象关键属性:
data
:当次消息数据lastEventId
:当次消息IDtype
:正常情况下都为message
方法
EventSource.close()
: 如果存在,则关闭连接,并且设置 readyState 属性为 CLOSED。如果连接已经被关闭,此方法不会再进行任何操作。
EventSource简单使用
//EventSource简单使用示例(利用重试机制不停轮询)
window.sseCount = 0;
var sse = new EventSource("/sse");
sse.onmessage = function (e) {
document.getElementById("s1").innerText=
new Date(Number(e.data)).toLocaleString();
++window.sseCount>=20&&sse.close()
}
//EventSource对应的java服务端event-stream
@RequestMapping(value="/sse",produces= MediaType.TEXT_EVENT_STREAM_VALUE)
public String sse(){
return "data:" + new Date().getTime() + "\n\n";
}
服务端event-stream
服务端event-stream基本介绍
对于服务器而言接收到EventSource的请求和接收到的常规HTTP请求没有什么不同,服务端每次收到EventSource都是一个全新的Request
- 响应时的
Content-Type
为text/event-stream
- 响应必须编码成utf-8等文本格式
- 响应的体的数据必须以
data:
开始,且一次轮询的响应体必须以\n\n
结束,否则不能触发onmessage
- 如果需要控制浏览器轮询频率则响应报文前加上
retry:毫秒数\n
- 如果需要给消息唯一的表示则响应报文前加上
id:字符串\n
- 在消息体中
\n
表示当前类型的内容结束但消息体未结束,而\n\n
表示消息体结束
EventSource接收的消息格式:
- 数据:(必须)
data:数据内容\n
,对应onmessage
入参对象的data
属性 - 事件id(可选,默认空):
id:事件ID字符串\n
,对应onmessage
入参对象的lastEventId
属性 - 频率(可选,默认3秒):
retry:毫秒数\n
,浏览器在接收到响应的制定毫秒数后重新轮询 - 结束标识(必须):
\n
,即消息体最后必须以\n\n
结束 - 完整响应文本格式:
retry:毫秒数\nid:ID字符串\ndata:DATA_CONTENT\n\n
- 样例:
retry:1000\nid:123\ndata:woshishuju\n\n
对应的消息为:重试间隔1秒,当次消息ID为123,当次消息的数据为woshishuju
浏览器js代码
window.sseCount = 0;
var sse = new EventSource("/test/sse");
sse.onmessage = function (e) {
console.log(e)
var newElement = document.createElement("li");
newElement.textContent = "id: " + e.lastEventId +
" , message: " + e.data + " , format: " +
new Date(Number(e.data)).toLocaleString();
++window.sseCount>=5&&sse.close();
var ul = document.querySelector('ul');
if(ul==null){
ul = document.createElement("ul");
document.body.appendChild(ul);
}
ul.appendChild(newElement);
}
/*
页面显示的消息是:
id: 13053 , message: 1614758013053 , format: 2021/3/3 下午3:53:33
id: 14073 , message: 1614758014073 , format: 2021/3/3 下午3:53:34
id: 15090 , message: 1614758015090 , format: 2021/3/3 下午3:53:35
id: 16114 , message: 1614758016114 , format: 2021/3/3 下午3:53:36
id: 17126 , message: 1614758017126 , format: 2021/3/3 下午3:53:37
*/
服务端java代码
@RequestMapping(value="/test/sse",produces= MediaType.TEXT_EVENT_STREAM_VALUE)
public String sse(){
System.out.println(Thread.currentThread().getName());
long d = new Date().getTime();
return "retry:1000\nid:"+(d%100000)+"\ndata:" + d + "\n\n";
}
//这个方法被EventSource每秒轮询一次(retry:1000)
//当这个方法被EventSource轮询时会打印输出日志如下:
/*
http-nio-8080-exec-2
http-nio-8080-exec-3
http-nio-8080-exec-4
http-nio-8080-exec-5
http-nio-8080-exec-7
http-nio-8080-exec-6
http-nio-8080-exec-8
http-nio-8080-exec-9
......
*/
//则证明每次被EventSource轮询时都是独立的HTTP请求
对应的每次EventSource轮询请求报文
GET /test/sse HTTP/1.1
Host: 127.0.0.1:8080
Connection: keep-alive
Accept: text/event-stream
Cache-Control: no-cache
Last-Event-ID: 85413
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.182 Safari/537.36 Edg/88.0.705.81
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: cors
Sec-Fetch-Dest: empty
Referer: http://127.0.0.1:8080/
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6
对应的每次EventSource轮询响应标头
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Content-Length: 40
Date: Wed, 03 Mar 2021 07:41:26 GMT
Keep-Alive: timeout=60
Connection: keep-alive
WebAsyncTask实现服务端推送
浏览器端 js 代码
//订阅
function sub(){
var sse = new EventSource("/sseStream?id=haha");
sse.onmessage = function (e) {
console.log(e)
var newElement = document.createElement("li");
newElement.textContent = "id: " + e.lastEventId +
" , message: " + e.data + " , format: " +
new Date(Number(e.data)).toLocaleString();
var ul = document.querySelector('ul');
if(ul==null){
ul = document.createElement("ul");
document.body.appendChild(ul);
}
ul.appendChild(newElement);
//接收到约定的结束标识则不再轮询,此时可能会因为延迟而轮询一次
if("over"===e.data)sse.close();
}
}
//发送get请求
function get(url){
var xhr = new XMLHttpRequest();
xhr.open("GET",url);
xhr.send("");
}
//模拟推送
function push(){
var i;
for(i=0;i<5;++i){
setTimeout(function (){
get("/push?id=haha&content="+new Date().getTime());
},1000*i);
}
setTimeout(function (){
get("/over?id=haha");
},1000*(i+1));
}
//测试
sub();
setTimeout(function(){
push();
},1000);
服务端 java 代码
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.WebAsyncTask;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class TestController {
// 新建一个容器,保存连接,用于输出流
private Map<String, PrintWriter> responseMap = new ConcurrentHashMap<>();
// 发送数据给客户端
private void writeData(String id, String msg, boolean over) throws Exception {
PrintWriter writer = responseMap.get(id);
if (writer == null) {
return;
}
if(msg==null)msg="";
writer.println("data:"+msg+"\n");
writer.flush();
if (over) {
writer.println("data:"+msg+"\n\n");
writer.flush();
writer.close();
responseMap.remove(id);
}
}
//订阅消息
@RequestMapping("/sseStream")
public WebAsyncTask<Void> sseStream(String id,final HttpServletResponse response){
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
Callable<Void> callable = () -> {
responseMap.put(id, response.getWriter());
writeData(id, "订阅成功", false);
while (true) {
Thread.sleep(1000);
if (!responseMap.containsKey(id)) {
break;
}
}
return null;
};
// 采用WebAsyncTask 返回 这样可以处理超时和错误 同时也可以指定使用的Excutor名称
WebAsyncTask<Void> webAsyncTask = new WebAsyncTask<>(30000, callable);
// 注意:onCompletion表示完成,不管你是否超时、是否抛出异常,这个函数都会执行的
webAsyncTask.onCompletion(() -> System.out.println("程序[正常执行]完成的回调"));
webAsyncTask.onTimeout(() -> {
responseMap.remove(id);
System.out.println("超时了!!!");
return null;
});
webAsyncTask.onError(() -> {
System.out.println("出现异常!!!");
return null;
});
return webAsyncTask;
}
//模拟消息推送
@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws Exception {
writeData(id, content, false);
return "over!";
}
//模拟结束当次推送
@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws Exception {
writeData(id, "over", true);
return "over!";
}
}
利用SseEmitter快速实现服务端推送
前端浏览器 js 代码
//向页面写接收到的数据
function writeSseLog(e){
var newElement = document.createElement("li");
newElement.textContent = "message: " + e.data;
var ul = document.querySelector('ul');
if(ul==null){
ul = document.createElement("ul");
document.body.appendChild(ul);
}
ul.appendChild(newElement);
}
//订阅消息
function sseSub(id){
window["sseSubId"+id]=null;
var sse = new EventSource("/sse/subscribe?id="+id);
sse.onmessage=function (e){
if(window["sseSubId"+id]=='over')sse.close();
writeSseLog(e);
}
}
//订阅者1
function sseSub1(){
sseSub("1");
}
//订阅者2
function sseSub2(){
sseSub("2");
}
//发送get请求
function get(url){
var xhr = new XMLHttpRequest();
xhr.open("GET",url);
xhr.send("");
}
//发布消息
function ssePush(id,txt){
var url;
if(id!=null){
url = "/sse/push?id="+id+"&content="+txt;
}else{
url = "/sse/pushAll?content="+txt;
}
get(url);
}
//向订阅者1发布
function ssePush1() {
ssePush("1","1-"+new Date().toLocaleString());
}
//向订阅者2发布
function ssePush2() {
ssePush("2","2-"+new Date().toLocaleString());
}
//向全部订阅者发布
function ssePushAll() {
ssePush(null,"A-"+new Date().toLocaleString());
}
//结束订阅并不再轮询
function sseOver(id){
window["sseSubId"+id]='over';
get("/sse/over?id="+id);
}
//结束全部订阅
function sseOverAll(){
sseOver("1");
sseOver("2");
}
//测试
sseSub1();
sseSub2();
window.pushCount=0;
for(var i=0;i<6;++i){
setTimeout(function(){
ssePush1();
ssePush2();
++window.pushCount%2==0&&ssePushAll();
window.pushCount==6&&sseOverAll();
},1000*i);
}
后端 java 代码
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
@RestController
@RequestMapping("/sse")
public class SseController {
private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
/**订阅*/
@GetMapping("/subscribe")
public SseEmitter subscribe(final String id){
var sseM = new SseEmitter(1000*60*60L);//超时时间1小时
sseCache.put(id, sseM);//放入缓存
sseM.onTimeout(()->{
sseCache.remove(id);
System.out.println("over time!");
});//超时从缓存删除
sseM.onCompletion(()-> {
sseCache.remove(id);
System.out.println("success over!");
});//完成从缓存删除
sseM.onError(throwable-> {
System.out.println("error! "+id);
throwable.printStackTrace();
});//发生错误时打印错误
return sseM;
}
/**消息发布到指定接收者*/
@GetMapping("/push")
public String push(final String id, String content) throws IOException {
var sseM = sseCache.get(id);
if(sseM!=null)sseM.send(content);
return "over";
}
/**断开订阅*/
@GetMapping("/over")
public String over(final String id){
var sseM = sseCache.get(id);
if(sseM!=null)sseM.complete();
return "over";
}
/**广播发布*/
@GetMapping("/pushAll")
public String pushAll(String content){
sseCache.keySet().forEach(k->{
try {
sseCache.get(k).send(content);
} catch (IOException e) {
e.printStackTrace();
}
});
return "over";
}
}
利用WebFlux更快实现服务端推送
前端浏览器 js 代码
var sse = new EventSource("/sse/subscribe");
sse.onmessage = function (e){
var ul = document.querySelector("ul");
if(ul==null){
ul = document.createElement("ul");
document.body.appendChild(ul);
}
var li = document.createElement("li");
li.textContent = e.data;
ul.appendChild(li);
}
服务端 java 代码
package com.example.wefluxdemo.web;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
import java.time.Duration;
import java.util.Date;
@RequestMapping("/sse")
@RestController
public class SseController {
@GetMapping(value = "/subscribe",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> subscribe(){
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> Tuples.of(seq, currTime()))
.map(data -> ServerSentEvent.<String>builder()
.id(Long.toString(data.getT1())) //为每次发送设置一个id
.data(data.getT2().toString())
.build());
}
private String currTime(){
return String.valueOf(new Date().getTime());
}
}