精通并发与 Netty
Netty 是一个异步的,事件驱动的网络通信框架,用于高性能的基于协议的客户端和服务端的开发。
异步指的是会立即返回,并不知道到底发送过去没有,成功没有,一般都会使用监听器来监听返回。
事件驱动是指开发者只需要关注事件对应的回调方法即可,比如 channel active,inactive,read 等等。
网络通信框架就不用解释了,很多你非常熟悉的组件都使用了 netty,比如 spark,dubbo 等等。
初步了解 Netty
第一个简单的例子,使用 Netty 实现一个 http 服务器,客户端调用一个没有参数的方法,服务端返回一个 hello world。
Netty 里面大量的代码都是对线程的处理和 IO 的异步的操作。
package com.paul;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Server {
public static void main(String[] args) throws InterruptedException {
//定义两个线程组,事件循环组,可以类比与 Tomcat 就是死循环,不断接收客户端的连接
// boss 线程组不断从客户端接受连接,但不处理,由 worker 线程组对连接进行真正的处理
// 一个线程组其实也能完成,推荐使用两个
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 服务端启动器,可以轻松的启动服务端的 channel
ServerBootstrap serverBootstrap = new ServerBootstrap();
//group 方法有两个,一个接收一个参数,另一个接收两个参数
// childhandler 是我们自己写的请求处理器
serverBootstrap.group(bossGroup, workerGroup).channel(NioSocketChannel.class)
.childHandler(new ServerInitializer());
//绑定端口
ChannelFuture future = serverBootstrap.bind(8011).sync();
//channel 关闭的监听
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.paul;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一层层过滤的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
//HttpServerCodec 是 HttpRequestDecoder 和 HttpReponseEncoder 的组合,编码和解码的 h handler
pipeline.addLast("httpServerCodec", new HttpServerCodec());
pipeline.addLast("handler", new ServerHandler());
}
}
package com.paul;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class ServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
if(httpObject instanceof HttpRequest) {
ByteBuf content = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//单纯的调用 write 只会放到缓存区,不会真的发送
channelHandlerContext.writeAndFlush(response);
}
}
}
我们在 SimpleChannelInboundHandler 里分析一下,先看它继承的 ChannelInboundHandlerAdapter 里面的事件回调方法,包括通道注册,解除注册,Active,InActive等等。
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
执行顺序为 handler added->channel registered->channel active->channelRead0->channel inactive->channel unregistered。
Netty 本身并不是遵循 servlet 规范的。Http 是基于请求和响应的无状态协议。Http 1.1 是有 keep-alived 参数的,如果3秒没有返回,则服务端主动关闭了解,Http 1.0 则是请求完成直接返回。
Netty 的连接会被一直保持,我们需要自己去处理这个功能。
在服务端发送完毕数据后,可以在服务端关闭 Channel。
ctx.channel.close();
Netty 能做什么
- 可以当作一个 http 服务器,但是他并没有实现 servelt 规范。虽然 Tomcat 底层本身也使用 NIO,但是 Netty 本身的特点决定了它比 Tomcat 的吞吐量更高。相比于 SpringMVC 等框架,Netty 没提供路由等功能,这也契合和 Netty 的设计思路,它更贴近底层。
- Socket 开发,也是应用最为广泛的领域,底层传输的最基础框架,RPC 框架底层多数采用 Netty。直接采用 Http 当然也可以,但是效率就低了很多了。
- 支持长连接的开发,消息推送,聊天,服务端向客户端推送等等都会采用 WebSocket 协议,就是长连接。
Netty 对 Socket 的实现
对于 Http 编程来说,我们实现了服务端就可以了,客户端完全可以使用浏览器或者 CURL 工具来充当。但是对于 Socket 编程来说,客户端也得我们自己实现。
服务器端:
Server 类于上面 Http 服务器那个一样,在 ServerInitoalizer 有一些变化
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一层层过滤的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串编码,解码
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
}
public class ServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端:
public class Client {
public static void main(String[] args) throws InterruptedException {
//客户端不需要两个 group,只需要一个就够了,直接连接服务端发送数据就可以了
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
//服务器端既可以使用 handler 也可以使用 childhandler, 客户端一般使用 handler
//对于 服务端,handler 是针对 bossgroup的,childhandler 是针对 workergorup 的
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道,管道里面可以有很多 handler,一层层过滤的柑橘
ChannelPipeline pipeline = socketChannel.pipeline();
// TCP 粘包 拆包
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
pipeline.addLast(new LengthFieldPrepender(4));
// 字符串编码,解码
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ClientHandler());
}
}
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+","+msg);
System.out.println("client output:"+ msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("23123");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty 长连接实现一个聊天室
Server 端:
public class ServerHandler extends SimpleChannelInboundHandler<String> {
//定义 channel group 来管理所有 channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "加入\n");
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "离开\n");
//这个 channel 会被自动从 channelGroup 里移除
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "上线");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "离开");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Client 端:
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for(;;){
channel.writeAndFlush(br.readLine() + "\r\n");
}
Netty 心跳
集群之间各个节点的通信,主从节点之间需要进行数据同步,每当主节点的数据发生变化时,通过异步的方式将数据同步到从节点,同步方式可以用日志等等,因此主从节点之间不是实时一致性而是最终一致性。
节点与节点之间如何进行通信那?这种主从模式是需要互相之间有长连接的,这样来确定对方还活着,实现方式是互相之间定时发送心跳数据包。如果发送几次后对方还是没有响应的话,就可以认为对方已经挂掉了。
回到客户端与服务端的模式,有人可能会想,客户端断开连接后服务端的 handlerRemoved 等方法不是能感知吗?还要心跳干什么哪?
真实情况其实非常复杂,比如手机客户端和服务端进行一个长连接,客户端没有退出应用,客户端开了飞行模型,或者强制关机,此时双方是感知不到连接已经断掉了,或者说需要非常长的时间才能感知到,这是我们不想看到的,这时就需要心跳了。
来看一个示例:
其他的代码还是和上面的一样,我们就不列出来了,直接进入主题,看不同的地方:
服务端
// Netty 为了支持心跳的 IdleStateHandler,空闲状态监测处理器。
pipeline.addLast(new IdleStateHandler(5,7,10,TimeUnit.SECONDS));
来看看 IdleStateHandler 的说明
/*
* Triggers an IdleStateEvent when a Channel has not performed read, write, or both
* operation for a while
* 当一个 channel 一断时间没有进行 read,write 就触发一个 IdleStateEvent
*/
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
//三个参数分别为多长时间没进行读,写或者读写操作则触发 event。
}
触发 event 后我们编写这个 event 对应的处理器。
public class MyHandler extends ChannelInboundHandlerAdapter{
//触发某个事件后这个方法就会被调用
//一个 channelhandlerContext 上下文对象,另一个是事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch(event.state()){
case READER_IDLE:
eventType = "读空闲";
case WRITER_IDLE:
eventType = "写空闲";
case ALL_IDLE:
eventType = "读写空闲";
}
}else{
//继续将事件向下一个 handler 传递
ctx.
}
}
}
WebSocket 实现与原理分析
WebSocket 是一种规范,是 HTML5 规范的一部分,主要是解决 Http 协议本身存在的问题。可以实现浏览器和服务端的长连接,连接头信息只在建立连接时发送一次。是在 Http 协议之上构建的,比如请求连接其实是一个 Http 请求,只不过里面加了一些 WebSocket 信息。也可以用在非浏览器场合,比如 app 上。
Http 是一种无状态的基于请求和响应的协议,意思是一定是客户端想服务端发送一个请求,服务端给客户端一个响应。Http 1.0 在服务端给客户端响应后连接就断了。Http 1.1 增加可 keep-alive,服务端可以和客户端在短时间之内保持一个连接,某个事件之内服务端和客户端可以复用这个链接。在这种情况下,网页聊天就是实现不了的,服务端的数据推送是无法实现的。
以前有一些假的长连接技术,比如轮询,缺点和明显,这里就不细说了。
Http 2.0 实现了长连接,但是这不在我们讨论范围之内。
针对服务端,Tomcat 新版本,Spring,和 Netty 都实现了对 Websocket 的支持。
使用 Netty 对 WebSocket 的支持来实现长连接
其他的部分还是一样的,先来看服务端的 WebSocketChannelInitializer。
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel>{
//需要支持 websocket,我们在 initChannel 是做一点改动
@Override
protected void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline pipeline = ch.pipeline();
//因为 websocket 是基于 http 的,所以要加入 http 相应的编解码器
pipeline.addLast(new HttpServerCodec());
//以块的方式进行写的处理器
pipeline.addLast(new ChunkedWriteHandler());
// 进行 http 聚合的处理器,将 HttpMessage 和 HttpContent 聚合到 FullHttpRequest 或者
// FullHttpResponse
//HttpObjectAggregator 在基于 netty 的 http 编程使用的非常多,粘包拆包。
pipeline.addLast(new HttpObjectAggregator(8192));
// 针对 websocket 的类,完成 websocket 构建的所有繁重工作,负责握手,以及心跳(close,ping,
// pong)的处理, websocket 通过 frame 帧来传递数据。
// BinaryWebSocketFrame,CloseWebSocketFrame,ContinuationWebSocketFrame,
// PingWebSocketFrame,PongWebSocketFrame,TextWebSocketFrame。
// /ws 是 context_path,websocket 协议标准,ws://server:port/context_path
pipeline.addLast(new WebSocketServerProcotolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
// websocket 协议需要用帧来传递参数
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception{
System.out.println("收到消息:"+ msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器返回"));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerAdded" + ctx.channel().id.asLongText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
System.out.println("handlerRemoved" + ctx.channel().id.asLongText());
}
}
客户端我们直接通过浏览器的原声 JS 来写
<script type="text/javascript">
var socket;
if(window.WebSocket){
socket = new WebSocket("ws://localhost:8899/ws");
socket.onmessage = function(event){
alert(event.data);
}
socket.onopen = function(event){
alert("连接开启");
}
socket.onclose = function(event){
alert("连接关闭");
}
}else{
alert("浏览器不支持 WebSocket");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}
}
</script>
我们在浏览器中通过 F12 看看 Http 协议升级为 WebSocket 协议的过程。