创建Websocket处理器继承AbstractWebSocketHandler

  • 覆写public void afterConnectionEstablished(WebSocketSession session)方法,建立Websocket连接
  • 覆写protected void handleTextMessage(WebSocketSession session, TextMessage message)方法,处理接收的消息
  • 发送消息方法webSocketSession.getWebSocketSession().sendMessage(new TextMessage(msg));
  • 覆写异常处理方法public void handleTransportError(WebSocketSession session, Throwable exception)
  • 覆写websocket关闭方法public void afterConnectionClosed(WebSocketSession session, CloseStatus status)

@Slf4j
@Component
public class MyWsHandler extends AbstractWebSocketHandler {
    private static final Map<String, Session> sessionMap ;//websocket连接集合
    static {
        //初始化
        sessionMap = new ConcurrentHashMap<>();
    }

    /**
     * 建立连接
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
        sessionMap.put(session.getId(),session);
        log.info(session.getClientId()+"建立了连接");
    }

    /**
     * 接收消息并处理
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
        log.info(sessionBeanMap.get(session.getId()).getClientId()+":"+message.getPayload());
        String param = message.getPayload();//接收到的消息
        sendMessageToOne(session.getId(),param);//发送消息
    }

    /**
     * 传输异常
     * @param session
     * @param exception
     * @throws Exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        super.handleTransportError(session, exception);
        if(session.isOpen()){
            session.close();
        }
        sessionMap.remove(session.getId());
    }

    /**
     * 连接关闭
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        int clientId = sessionMap.get(session.getId()).getClientId();
        sessionMap.remove(session);
        log.info(clientId+"关闭了连接");
        super.afterConnectionClosed(session, status);
    }
    /**
     * 给指定用户窗口推送消息
     * @param userId
     * @param param
     */
    public void sendMessageToOne(String userId,String param){
        try {
                sessionMap.get(userId).sendMessage(new TextMessage("Hello World!"));
            } catch (IOException | ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
    }
}

创建Websocket拦截器继承HttpSessionHandshakeInterceptor(非必须)

  • 建立连接前做处理,覆写public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes)
  • 建立连接后做处理,覆写public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex)
@Component
@Slf4j
public class MyInterceptor extends HttpSessionHandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
       log.info(request.getRemoteAddress().toString()+"开始握手");
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
        log.info(request.getRemoteAddress().toString()+"完成握手");
        super.afterHandshake(request, response, wsHandler, ex);
    }
}

注册Websocket处理器和Websocket拦截器

@Configuration
@EnableWebSocket
public class MyWsConfig implements WebSocketConfigurer {
    @Resource
    private MyWsHandler myWsHandler;
    @Resource
    private MyInterceptor myWsInterceptor;

    /**
     * 注册,websocket访问路径/myWebsocket
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWsHandler,"/myWebsocket").addInterceptors(myWsInterceptor).setAllowedOrigins("*");
    }
}
06-27 16:40