基于netty 和 websocket 做一个即时通信 聊天的小应用练习
先来了解即时通信 , 一般会使用三种实现方式:
- Ajax 轮训
- Long pull
- websocket
有很多的例子,比如一些电脑上群组聊天室,手游中的聊天平台等等,都需要一个实时通信,如何实现双向通信
Ajax轮训,是制定每过几秒钟,去ajax异步请求同步服务器新的数据
Long pull也是采用循环的方式,是一种阻塞的模式,当发出请求,如果服务器不响应,他就会一直卡住不动,早期的通信方式
websocket最初由H5提起,是一种协议,http1.1是支持长链接,http1.0是不支持长链接的,websocket基于TCP协议之上,提供具有持久性的协议
对比http每发起一个,必然存在request和response,且是1:1对应的
websocket的优点使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据, 只需要一次链接即可保持长久链接传输数据,除非自己退出游戏了,重新上线
Web即时通信
基于之前的经验,先写一个server服务端
package com.yus.netty.server;
import io.netty.bootstrap.BootstrapConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* websocket 服务端
*/
public class WebSocketServer {
public static void main(String[] args) throws InterruptedException {
//采用主从线程组模型
//主线程组
EventLoopGroup primaryGroup = new NioEventLoopGroup();
//从线程组
EventLoopGroup subGroup = new NioEventLoopGroup();
try {
//服务启动器
ServerBootstrap bootstrap = new ServerBootstrap();
//建立通道,管道以及助手处理类 入口
bootstrap.group(primaryGroup, subGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebSocketChannelInit());
//绑定端口
ChannelFuture future = bootstrap.bind(8081).sync();
future.channel().closeFuture().sync();
} finally {
//关闭
primaryGroup.shutdownGracefully();
subGroup.shutdownGracefully();
}
}
}
初始化器
package com.yus.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* 通道初始化器
*/
public class WebSocketChannelInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取管道
ChannelPipeline pipeline = ch.pipeline();
//=====开始============用于支持Http协议的处理类=================
//通信 netty提供的编解码处理类
pipeline.addLast(new HttpServerCodec());
//对处理数量大的数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
//聚合器,方便处理http消息 1024*64 为消息最大长度(byte)支持http协议的必要处理类
pipeline.addLast(new HttpObjectAggregator(1024*64));
//=====结束============用于支持Http协议的处理类=================
// 支持websocket协议的处理类,建立链接时使用
// /ws指定客户端访问服务端的路由,可随便自定义,这边写ws是websocket简写
// 该处理类帮我们处理繁重的事情并run websocket服务端,
// 并管理通信握手动作(包括close关闭,Ping请求,Pong响应)Ping+Pong=心跳,关于心跳后续再做说明
// 并以frame进行数据传输,不同的数据类型,frame也不同
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//自定义 处理类,主要用于读取客户端消息,然后对消息进行处理,最后可以返回给客户端
pipeline.addLast("myHandle", new MyHandle());
}
}
自定义处理类
package com.yus.netty.server;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 自定义处理类
* 在写初识化器时有说明,关于websocket传输时,主要以frames方式传输
* 在Netty中frame会专门为websocket处理 文本 的对象 - TextWebSocketFrame
* frame是消息的载体
*/
public class MyHandle extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 第四步
// ChannelGroup:记录和管理Channel,使用DefaultChannelGroup默认实现,GlobalEventExecutor全局初始化
private static ChannelGroup channelClient = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//第一步:从消息载体中获取客户端的消息
String content = msg.text();
System.out.println("消息:" + content);
//第二步:
//拿到消息文本,然后将消息发给所有客户端,这时不管有多少个客户端
//都可以将此客户端的消息给所有的客户端,每一个客户端会注册一个channel进来
//通过channel通道进行消息推送出去,这时候就用到了上次学习的Channel的方法周期,
//生命周期 重写handlerAdded 和 handlerRemoved
Channel channel = ctx.channel();
//第七步
//将数据 刷到所有的客户端 第一种方式
for (Channel channels : channelClient){
//注意 这边的载体是泛型TextWebSocketFrame ,不能直接String扔出去
//要将消息放入载体,再送出去
channels.writeAndFlush(new TextWebSocketFrame("我在哪里,我被送出去了吗?"));
}
//将数据 刷到所有的客户端 第二种 方式直接用ChannelGroup的writeAndFlush
//channelClient.writeAndFlush(new TextWebSocketFrame("我又在哪里,我被送出去了吗?"));
}
/**
* 第三步
* 客户端创建了,触发
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//获取客户端的channel双向通道
Channel channel = ctx.channel();
//第五步
//添加到ChannelGroup,方便全局管理
channelClient.add(channel);
}
/**
* 第六步
* 客户端离开了,触发
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//当触发这个handlerRemoved时,其实ChannelGroup会自动移除对应客户端的通道channel
//所以不需要我们去调remove的方法,测试发现是多余的
//channelClient.remove(ctx.channel());
//ctx.channel().id()中存在两个ID,一个长ID,一个短ID,如果学习过zookeeper的同学会熟悉一些
//服务少的时候,短ID冲突的可能性小,会用短ID进行选择,反之就是长ID
System.out.println("Channel 长ID为 " + ctx.channel().id().asLongText() + "客户端离开了");
System.out.println("Channel 短ID为 " + ctx.channel().id().asShortText() + "客户端离开了");
}
}
编写完后端,已经快1点钟了,睡觉了,明天继续写个测试前端页面
--------------------------------------------------