pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion> <groupId>com.sys</groupId>
<artifactId>springboot-socketio</artifactId>
<version>0.0.-SNAPSHOT</version>
<packaging>jar</packaging> <name>springboot-socketio</name>
<description>Demo project for Spring Boot</description> <parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0..RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent> <properties>
<project.build.sourceEncoding>UTF-</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.corundumstudio.socketio/netty-socketio -->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> <build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build> </project>

2 启动加载 socket

import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; @Component
@Order(value=)
public class MyCommandLineRunner implements CommandLineRunner {
private final SocketIOServer server; @Autowired
public MyCommandLineRunner(SocketIOServer server) {
this.server = server;
} @Override
public void run(String... args) throws Exception {
server.start();
System.out.println("socket.io启动成功!");
}
}

3 建立连接

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import java.util.ArrayList;
import java.util.Date;
import java.util.UUID; @Component
public class MessageEventHandler {
public static SocketIOServer socketIoServer;
public static ArrayList<UUID> listClient = new ArrayList<>();
//static final int limitSeconds = 60; @Autowired
public MessageEventHandler(SocketIOServer server) {
MessageEventHandler.socketIoServer = server;
} @OnConnect
public void onConnect(SocketIOClient client) {
listClient.add(client.getSessionId());
System.err.println(listClient.size());
System.out.println("客户端:" + client.getSessionId() + "已连接");
} @OnDisconnect
public void onDisconnect(SocketIOClient client) {
System.out.println("客户端:" + client.getSessionId() + "断开连接");
listClient.remove(client.getSessionId());
} @OnEvent(value = "messageevent") //value是监听事件的名称
public void onEvent(SocketIOClient client, AckRequest request, Object data) {
//System.out.println("发来消息:" + data.toString());
//socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data"+new Date().getTime());
} public static void sendBuyLogEvent() { //这里就是向客户端推消息了
long dateTime = new Date().getTime(); for (UUID clientId : listClient) {
if (socketIoServer.getClient(clientId) == null) continue;
socketIoServer.getClient(clientId).sendEvent("messageevent", dateTime, );
}
} }

4 springboot 启动类SpringbootSocketioApplication  配置socket bean

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean; import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import com.sys.demo.kafka.KafkaSender; @SpringBootApplication
public class SpringbootSocketioApplication { public static void main(String[] args) {
ConfigurableApplicationContext context =SpringApplication.run(SpringbootSocketioApplication.class, args);
//SpringApplication.run(SpringBootKakfaApplication.class, args);
/*KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i < 3; i++) {
//调用消息发送类中的消息发送方法
sender.send(); try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} */ }
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); String os = System.getProperty("os.name");
if(os.toLowerCase().startsWith("win")){ //在本地window环境测试时用localhost
System.out.println("this is windows");
config.setHostname("localhost");
} else {
config.setHostname("123.123.111.222"); //部署到你的远程服务器正式发布环境时用服务器公网ip
}
config.setPort(); /*config.setAuthorizationListener(new AuthorizationListener() {//类似过滤器
@Override
public boolean isAuthorized(HandshakeData data) {
//http://localhost:8081?username=test&password=test
//例如果使用上面的链接进行connect,可以使用如下代码获取用户密码信息,本文不做身份验证
// String username = data.getSingleUrlParam("username");
// String password = data.getSingleUrlParam("password");
return true;
}
});*/ final SocketIOServer server = new SocketIOServer(config);
return server;
} @Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
} }

5 监听kafka 发送消息给客户端

package com.sys.demo.kafka;

import java.util.Optional;
import java.util.UUID; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import com.sys.demo.MessageEventHandler; @Component
public class KafkaReceiver {
@Autowired
private MessageEventHandler messageEventHandler;
@KafkaListener(topics = {"mycall_out"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.err.println("to client message:"+message);
for (UUID clientId : messageEventHandler.listClient) {
if (messageEventHandler.socketIoServer.getClient(clientId) == null)
continue;
//发送消息
messageEventHandler.socketIoServer.getClient(clientId).sendEvent("messageevent", message);
}
} }
}
05-27 22:39