创建Websocket处理器继承AbstractWebSocketHandler
- 覆写public void afterConnectionEstablished(WebSocketSession session)方法,建立Websocket连接
- 覆写protected void handleTextMessage(WebSocketSession session, TextMessage message)方法,处理接收的消息
- 发送消息方法webSocketSession.getWebSocketSession().sendMessage(new TextMessage(msg));
- 覆写异常处理方法public void handleTransportError(WebSocketSession session, Throwable exception)
- 覆写websocket关闭方法public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
@Slf4j
@Component
public class MyWsHandler extends AbstractWebSocketHandler {
private static final Map<String, Session> sessionMap ;//websocket连接集合
static {
//初始化
sessionMap = new ConcurrentHashMap<>();
}
/**
* 建立连接
* @param session
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
sessionMap.put(session.getId(),session);
log.info(session.getClientId()+"建立了连接");
}
/**
* 接收消息并处理
* @param session
* @param message
* @throws Exception
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
log.info(sessionBeanMap.get(session.getId()).getClientId()+":"+message.getPayload());
String param = message.getPayload();//接收到的消息
sendMessageToOne(session.getId(),param);//发送消息
}
/**
* 传输异常
* @param session
* @param exception
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
super.handleTransportError(session, exception);
if(session.isOpen()){
session.close();
}
sessionMap.remove(session.getId());
}
/**
* 连接关闭
* @param session
* @param status
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
int clientId = sessionMap.get(session.getId()).getClientId();
sessionMap.remove(session);
log.info(clientId+"关闭了连接");
super.afterConnectionClosed(session, status);
}
/**
* 给指定用户窗口推送消息
* @param userId
* @param param
*/
public void sendMessageToOne(String userId,String param){
try {
sessionMap.get(userId).sendMessage(new TextMessage("Hello World!"));
} catch (IOException | ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}
创建Websocket拦截器继承HttpSessionHandshakeInterceptor(非必须)
- 建立连接前做处理,覆写public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes)
- 建立连接后做处理,覆写public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex)
@Component
@Slf4j
public class MyInterceptor extends HttpSessionHandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
log.info(request.getRemoteAddress().toString()+"开始握手");
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
log.info(request.getRemoteAddress().toString()+"完成握手");
super.afterHandshake(request, response, wsHandler, ex);
}
}
注册Websocket处理器和Websocket拦截器
@Configuration
@EnableWebSocket
public class MyWsConfig implements WebSocketConfigurer {
@Resource
private MyWsHandler myWsHandler;
@Resource
private MyInterceptor myWsInterceptor;
/**
* 注册,websocket访问路径/myWebsocket
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWsHandler,"/myWebsocket").addInterceptors(myWsInterceptor).setAllowedOrigins("*");
}
}