一.websocket协议介绍
1.websocket
websocket是一种标准协议,用于客户端和服务端之间进行双向数据传输,它是一种基于TCP协议的独立实现;其最大的特点是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。websocket借用http来完成一次握手,只需要一次HTTP握手,服务端就能一直与客户端保持通讯,直到关闭连接。
2.http的三次握手
第一次握手: 建立连接,客户端A发送SYN=1、随机产生Seq=client_isn的数据包到服务器B,等待服务器确认。
第二次握手: 服务器B收到请求后确认联机(可以接受数据),发起第二次握手请求,ACK=(A的Seq+1)、SYN=1,随机产生Seq=client_isn的数据包到A。
第三次握手: A收到后检查ACK是否正确,若正确,A会在发送确认包ACK=服务器B的Seq+1、ACK=1,服务器B收到后确认Seq值与ACK值,若正确,则建立连接。
http的三次握手和四次挥手解析:
浏览器在给服务器传输数据之前,有三次握手,握手成功之后,才可以传输数据
1、浏览器需要先发送SYN码,客户端请求和服务器建立连接;(客户端->问服务器在吗?->服务端)
2、服务器接收到SYN码,再发送给客户端SYN+ACK码,我可以建立连接;(服务端->在呢,咋啦?->客户端)
3、客户端接收到ACK码,验证这个ACK是否正确,如果正确则客户端和服务端则建立起数据连接;双方的数据发送通道都将开启;(客户端->我给你说我今天......->服务端)
为什么要经过三次握手呢?
为了防止服务端开启一些无用的链接,网络传输是有延时的,中间可能隔着非常远的距离,通过光纤或者中间代理服务器等,客户端发送一个请求,服务端收到之后如果直接创建一个链接,返回内容给到客户端,因为网络传输原因,这个数据包丢失了,客户端就一直接收不到服务器返回的这个数据,超过了客户端设置的时间就关闭了,那么这时候服务端是不知道的,它的端口就会开着等待客户端发送实际的请求数据,服务这个开销也就浪费掉了。
四次挥手:
1、当客户端无数据要传输了,会发送FIN码告诉服务器,我发送完毕了;
2、当服务端接收完毕后,告诉客户端ACK码,告诉客户端你可以把数据通道关闭了;
3、当服务器发送完毕之后,也会发送FIN码,告诉浏览器,数据发送完毕;
4、当客户端接收完毕 之后,同样发送ACK码,告诉服务器,数据接收完毕,你可以关闭;
三次握手和四次挥手的好处:确保数据的安全和完整
WebSocket
WebSocket实现了双工通信,就是在客户端和服务端上建立一个长久的连接,然后两边可以任意发送数据,它属于应用层的协议,基于TCP传输协议并复用HTTP的握手通道。
WebSocket的优势:
支持双向通信,实时性更强;
更好的二进制支持;
较少的控制开销(连接创建后,ws客户端与服务端数据交换时,协议控制的数据包头部较少)。
二.典型的websocket握手过程
握手请求信息:
#两行表示发起的是websocket协议;
Upgrade:websocket
Connection:Upgrade
#websocket-key是由浏览器随机生成的,是一个Base64编码的值
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
#websocket-version表示ws的版本,询问服务端是否支持该版本
Sec-WebSocket-Version:13
握手响应信息:
#101状态码表示服务器已经理解了客户端的请求
HTTP/1.1 101
#websocket-accept是经过服务器确认后的值
Sec-WebSocket-Accept:HSmrc0sMlYUkAGmm5OPpG2HaGWk==
三.服务端实现例子
步骤一: springboot底层帮我们自动配置了websokcet,引入maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
步骤二:如果是你采用springboot内置容器启动项目的,则需要配置一个Bean。如果是采用外部的容器,则可以不需要配置。
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
/**
* Created on 2023-03-15 9:12
*
* @author xiegongmiao
* description:websocket配置,用于开启websocket支持
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements ServletContextInitializer {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
@Override
public void onStartup(ServletContext servletContext) throws ServletException {
}
}
步骤三:编写服务端核心代码
websoketSever.java
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import www.wensi.com.xiurtc.api.response.BaseResponse;
import www.wensi.com.xiurtc.api.response.Constant;
import www.wensi.com.xiurtc.common.MessageCacheUtil;
import www.wensi.com.xiurtc.config.ServerEncoder;
import www.wensi.com.xiurtc.entity.WsMessageCache;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* Created on 2023-03-16 16:31
*
* @author description:长连接
*/
@Component
@Data
@Slf4j
@ServerEndpoint(value = "/c/websocket/{mac}", encoders = {ServerEncoder.class})
public class WebSocketSever {
private Session session;
// 查询数据库用的服务
private static WebsocketService websocketService;
// 注入的时候,给类的 service 注入
@Autowired
public void setWebsocketService(WebsocketService websocketService) {
WebSocketSever.websocketService = websocketService;
}
/**
* 保存连接的会话
*/
private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
// concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。
private static CopyOnWriteArraySet<WebSocketSever> webSocketSet = new CopyOnWriteArraySet<>();
/**
* 建立连接时调用
*
* @param session
* @param config
*/
@OnOpen
public void onOpen(@PathParam("mac") String mac, Session session, EndpointConfig config) {
log.info("webSocket connecting,connect cloud phone mac:{}", mac);
try {
Session historySession = sessionPool.get(mac);
// historySession不为空,说明已经有人登陆账号,应该删除登陆的WebSocket对象
if (historySession != null) {
webSocketSet.remove(historySession);
historySession.close();
}
} catch (IOException e) {
log.error("mac:{},duplicate login exception,error message:{}", mac, e.getMessage());
}
// 建立连接
this.session = session;
webSocketSet.add(this);
sessionPool.put(mac, session);
log.info("mac:{},connection completed,the current number of people online is:{}", mac, webSocketSet.size());
BaseResponse message = websocketService.getPublicPort(mac);
sendMessageByMac(mac, message);
}
/**
* 发生错误
*
* @param throwable e
*/
@OnError
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
/**
* 连接关闭
*/
@OnClose
public void onClose(@PathParam("mac") String mac) {
webSocketSet.remove(this);
sessionPool.remove(session);
MessageCacheUtil.clearOnly(mac);
log.info("mac:{},connection close,the current number of people online is:{}", mac, webSocketSet.size());
}
/**
* 消息到达时调用
*
* @param mac
* @param message
*/
@OnMessage
public void onMessage(@PathParam("mac") String mac, String message) {
log.info("received a message from the client,mac:{},message:{}", mac, message);
if (StringUtils.isBlank(message)) {
return;
}
try {
JSONObject json = JSONObject.parseObject(message);
if ("state".equals(json.get("type"))) {
websocketService.updateVmState(mac, json.getInteger("state"));
return;
} else if ("PING".equals(json.get("type"))) {
heartCheck(mac);
return;
}
} catch (Exception e) {
MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
WsMessageCache wsMessageCache = new WsMessageCache();
wsMessageCache.setMessage(message);
wsMessageCache.setDate(new Date().getTime());
MessageCacheUtil.setMessageCache(mac, wsMessageCache);
}
}
/**
* 推送消息到指定用户
*
* @param mac 云手机mac地址
* @param message 发送的消息
*/
public void sendMessageByMac(String mac, String message) {
log.info("push content to cloud phone,mac:{},message:{}", mac, message);
MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
Session session = sessionPool.get(mac);
if (ObjectUtils.isEmpty(session)) {
log.error("push content to cloud phone,cloud phone is not connected,mac:{},message:{}", mac, message);
return;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
} catch (Exception e) {
log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
}
}
/**
* 推送消息到指定用户
*
* @param mac 云手机mac地址
* @param message 发送的消息
*/
public void sendMessageByMac(String mac, Object message) {
log.info("push content to cloud phone,mac:{},message:{}", mac, message);
MessageCacheUtil.deleteForExpired(Constant.TIME_OUT_WEBSOCKET_POLLING_CACHE_TIME);
Session session = sessionPool.get(mac);
try {
session.getBasicRemote().sendObject(message);
} catch (EncodeException | IOException e) {
log.error("push content to cloud phone,mac:{},message:{},error:{}", mac, message, e.getMessage());
}
}
/**
* 群发消息
*
* @param message 发送的消息
*/
public static void sendAllMessage(String message) {
log.info("mass message:{}", message);
for (WebSocketSever webSocket : webSocketSet) {
try {
webSocket.session.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("mass message:{},error:{}", message, e);
}
}
}
/**
* 心跳检测机制
*
* @param mac mac地址
*/
public static void heartCheck(String mac) {
Map<String, Object> params = new HashMap<String, Object>();
params.put("type", "PONG");
Session session = sessionPool.get(mac);
if (ObjectUtils.isEmpty(session)) {
log.error("heart check,cloud phone is not connected,mac:{},message:{}", mac, params);
return;
}
try {
session.getBasicRemote().sendObject(params);
} catch (IOException e) {
log.error("heart check cloud phone,mac:{},message:{},error:{}", mac, params, e.getMessage());
} catch (Exception e) {
log.error("heart check cloud phone,mac:{},message:{},error:{}", mac, params, e.getMessage());
}
}
}
我这里没有用nosql缓存,自己编写了个MessageCacheUtil缓存工具类。用于缓存客户端发来的信息。 WsMessageCache.java
import lombok.Data;
@Data
public class WsMessageCache {
private Long date;
private String message;
}
MessageCacheUtil.java
import org.springframework.util.ObjectUtils;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
public class MessageCacheUtil {
private static Map<Object, WsMessageCache> cacheMap = new HashMap<Object, WsMessageCache>();
/**
* 单实例构造方法
*/
private MessageCacheUtil() {
super();
}
/**
* 根据键获取时间long
*
* @param key
* @return
*/
public static WsMessageCache getMessageCache(String key) {
return (WsMessageCache) cacheMap.get(key);
}
/**
* 设置缓存
*
* @param key
* @param wsMessageCache
* @return
*/
public synchronized static Map<Object, WsMessageCache> setMessageCache(String key, WsMessageCache wsMessageCache) {
cacheMap.put(key, wsMessageCache);
return cacheMap;
}
/**
* 判断是否存在一个缓存
*
* @param key
* @return
*/
public synchronized static boolean hasCache(String key) {
return cacheMap.containsKey(key);
}
/**
* 判断是否存在缓存
*
* @param key
* @return
*/
public synchronized static boolean hasOneCache(String key) {
return !ObjectUtils.isEmpty(cacheMap);
}
/**
* 清除所有缓存
*/
public synchronized static void clearAll() {
cacheMap.clear();
}
/**
* 清除指定的缓存
*
* @param key
*/
public synchronized static void clearOnly(String key) {
cacheMap.remove(key);
}
/**
* 获取缓存中的大小
*
* @return
*/
public static int getCacheSize() {
return cacheMap.size();
}
/**
* 删除过期消息
*
* @param time
*/
public static void deleteForExpired(Long time) {
Iterator<Entry<Object, WsMessageCache>> it = cacheMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Object, WsMessageCache> entry = it.next();
WsMessageCache value = entry.getValue();
if ((new Date()).getTime() - (value.getDate()) >= time) {
it.remove();
}
}
}
}
WebSoketTestController.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import www.wensi.com.xiurtc.api.enums.StatusCode;
import www.wensi.com.xiurtc.api.response.BaseResponse;
import www.wensi.com.xiurtc.common.MessageCacheUtil;
import www.wensi.com.xiurtc.entity.WsMessageCache;
import www.wensi.com.xiurtc.service.WebSocketSever;
import www.wensi.com.xiurtc.utils.MessageUtils;
import www.wensi.com.xiurtc.utils.ResultUtil;
import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
@RequestMapping("/webSoketTest")
public class WebSoketTestController {
@Autowired
private WebSocketSever wsSever;
@RequestMapping("/r/forCustomer/send")
public BaseResponse getCandidate(HttpServletRequest request) {
String message = "我的消息";
String mac = "6e:66:88:08:00:01";
// 发送消息给客户端
wsSever.sendMessageByMac(mac, message);
// 在缓存中,获取到客户端返回的信息
BaseResponse response = getWsMessageCache(mac);
return response;
}
/**
* 获取缓存消息
*
* @param mac
* @return
*/
private BaseResponse getWsMessageCache(String mac) {
Long time = 0L;
BaseResponse response = ResultUtil.success();
while (true) {
// 如果时间达到没有设置成功,则超时
if (time > 2000L) {
String msg = "connect timed out by mac:" + mac;
response.setstatus(StatusCode.FAIL.getStatus());
response.setMessage(MessageUtils.message("error.connect.timed.out"));
log.error(msg);
break;
}
if (MessageCacheUtil.hasCache(mac)) {
WsMessageCache wsMessageCache = MessageCacheUtil.getMessageCache(mac);
if (ObjectUtils.isEmpty(wsMessageCache)) {
String msg = "connect timed out by mac:" + mac;
response.setstatus(StatusCode.FAIL.getStatus());
response.setMessage(MessageUtils.message("error.connect.timed.out"));
log.error(msg);
break;
}
String messageCache = wsMessageCache.getMessage();
response.setData(messageCache);
MessageCacheUtil.clearOnly(mac);
log.info("send message to client success,mac:{},response:{}", mac, response);
break;
} else {
try {
TimeUnit.MILLISECONDS.sleep(100L);
time = time + 100L;
} catch (InterruptedException e) {
String msg = "connect timed out by mac:" + mac;
response.setstatus(StatusCode.FAIL.getStatus());
response.setMessage(MessageUtils.message("error.connect.timed.out"));
log.error(msg);
break;
}
continue;
}
}
return response;
}
}
启动服务
测试页面
http://www.jsons.cn/websocket
ws://localhost:9998/dc-xiurtc/c/websocket/6e:66:88:08:00:01