本文首发于微信公众号【猿灯塔】,转载引用请说明出处
今天呢!灯塔君跟大家讲:
Netty应用
一.Netty简介
1、Netty下载
官网:https://netty.io/downloads.html
2、Netty简介
Netty is *an asynchronous event-driven network application framework* for
rapid development of maintainable high performance protocol servers &
clients.
是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端
Netty is a NIO client server framework which enables quick and easy
development of network applications such as protocol servers and clients. It
greatly simplifies and streamlines network programming such as TCP and UDP
socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from
a maintainability or a performance issue. Netty has been designed carefully
with the experiences earned from the implementation of a lot of protocols
such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols.
As a result, Netty has succeeded to find a way to achieve ease of
development, performance, stability, and flexibility without a compromise.
Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户
端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。“快速简便”并不意味着最终的应用
程序将遭受可维护性或性能问题的困扰。Netty经过精心设计,结合了许多协议(例如FTP,SMTP,
HTTP以及各种基于二进制和文本的旧式协议)的实施经验。结果,Netty成功地找到了一种无需妥协即
可轻松实现开发,性能,稳定性和灵活性的方法。
Features
Design 设计
Unified API for various transport types - blocking and non-blocking
socket
Based on a flexible and extensible event model which allows clear
separation of concerns Highly customizable thread model - single thread,
one or more thread
pools such as SEDA True connectionless datagram socket support (since
3.1)
Ease of use 使用方便
Well-documented Javadoc, user guide and examples
No additional dependencies, JDK 5 (Netty 3.x) or 6 (Netty 4.x) is enough
Note: Some components such as HTTP/2 might have more requirements.
Please refer to the Requirements page for more information.
Performance 性能
Better throughput, lower latency
Less resource consumption
Minimized unnecessary memory copy
Security 安全
Complete SSL/TLS and StartTLS support
Community 社区
Release early, release often
The author has been writing similar frameworks since 2003 and he still
findsyourfeed back precious!
3、常用网络通信框架
a、Mina
Mina出身于开源界的大牛Apache组织。是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网 络应用程序提 供了非常便利的框架。当前发行的 Mina 版本2.04支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程 序,Mina 所支持的功能也在进一步的扩展中。目前,正在使用Mina的应用包 括:Apache Directory Project、AsyncWeb,AMQP(Advanced Message Queuing Protocol),RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、 Openfire等等
b、Netty
Netty是一款异步的事件驱动的网络应用框架和工具,用于快速开发可维护的高性能、高扩展性协议服务器和 客户端。也就是说,Netty是一个NIO客户端/服务器框架,支持快速、简单地开发网络应用,如协议服务器和 客户端。它极大简化了网络编程,如TCP和UDP套接字服务器
c、Grizzly
Grizzly是一种应用程序框架,专门解决编写成千上万用户访问服务器时候产生的各种问题。使用JAVA NIO 作为基础,并隐藏其编程的复杂性。容易使用的高性能的API。带来非阻塞socketd到协议处理层。利用高性 能的缓冲和缓冲管理使用高性能的线程池。从设计的理念上来看,Mina的设计理念是最为优雅的。当然,由于 Netty的主导作者与Mina的主导作者是同一人,出自同一人之手的Netty在设计理念上与Mina基本上是一致 的。而Grizzly在设计理念上就较差了点,几乎是JavaNIO的简单封装
4、使用领域
a、互联网行业
阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用
Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。除了 Dubbo 之外,淘宝的消息中间
件 RocketMQ 的消息生产者和消息消费者之间,也采用 Netty 进行高性能、异步通信。
除了阿里系和淘宝系之外,很多其它的大型互联网公司或者电商内部也已经大量使用 Netty 构建高性 能、分布式的网络服务器
b、游戏行业
无论是手游服务端、还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈,非常方便定制和开发私有协议栈。账号登陆服务器、地图服务器之间可以方便的通过 Netty 进行高性能的通信
c、大数据领域
经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它
的 Netty Service 基于 Netty 框架二次封装实现。大数据计算往往采用多个计算节点和一个/N个汇总节
点进行分布式部署,各节点之间存在海量的数据交换。由于 Netty 的综合性能是目前各个成熟 NIO 框
架中最高的,因此,往往会被选中用作大数据各节点间的通信。
d、企业软件
企业和 IT 集成需要 ESB,Netty 对多协议支持、私有协议定制的简洁性和高性能是 ESB RPC 框架的首
选通信组件。事实上,很多企业总线厂商会选择Netty 作为基础通信组件,用于企业的 IT 集成。
e、通信行业
Netty 的异步高性能、高可靠性和高成熟度的优点,使它在通信行业得到了大量的应用。
二.Netty核心组件
1、Channel
Channel 是 Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之
外,还包括了 Netty 框架相关的一些功能,如获取该 Channe l的 EventLoop。
在传统的网络编程中,作为核心类的 Socket ,它对程序员来说并不是那么友好,直接使用其成本还是
稍微高了点。而Netty 的 Channel 则提供的一系列的 API ,它大大降低了直接与 Socket 进行操作的复
杂性。而相对于原生 NIO 的 Channel,Netty 的 Channel 具有如下优势(摘自《Netty权威指南(第二 版)》):
在 Channel 接口层,采用 Facade 模式进行统一封装,将网络 I/O 操作、网络 I/O 相关联的其他操作封
装起来,统一对外提供。
Channel 接口的定义尽量大而全,为SocketChannel 和 ServerSocketChannel 提供统一的视图,由不 同子类实现不同的功能,公共功能在抽象父类中实现,最大程度地实现功能和接口的重用。
具体实现采用聚合而非包含的方式,将相关的功能类聚合在 Channel 中,有 Channel 统一负责和调
度,功能实现更加灵活。
2、ChannelFuture
Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。 Netty 提供了 ChannelFuture 接口,通过该接口的 addListener() 方法注册一个
ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果
3、EventLoop
Netty 基于事件驱动模型,使用不同的事件来通知我们状态的改变或者操作状态的改变。它定义了在整个连
接的生命周期里当有事件发生的时候处理的核心抽象。Channel 为Netty 网络操作抽象类,EventLoop 主
要是为Channel 处理 I/O 操作,两者配合参与 I/O 操作。
下图是Channel、EventLoop、Thread、EventLoopGroup之间的关系(摘自《Netty In
Action》):
一个 EventLoopGroup 包含一个或多个 EventLoop。
一个 EventLoop 在它的生命周期内只能与一个Thread绑定。
所有EnventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理。
一个 Channel 在它的生命周期内只能注册与一个 EventLoop。
一个 EventLoop 可被分配至一个或多个 Channel 。
当一个连接到达时,Netty 就会注册一个 Channel,然后从 EventLoopGroup 中分配一个 EventLoop
绑定到 这个Channel上,在该Channel的整个生命周期中都是有这个绑定的 EventLoop 来服务的。
4、ChannelHandler
ChannelHandler 为 Netty 中最核心的组件,它充当了所有处理入站和出站数据的应用程序逻辑的容器。
ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转 换等。
ChannelHandler 有两个核心子类 ChannelInboundHandler 和 ChannelOutboundHandler,其中
ChannelInboundHandler 用于接收、处理入站数据和事件,而 ChannelOutboundHandler 则相反。
5、ChannelPipeline
ChannelPipeline 为 ChannelHandler 链提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API。一个数据或者事件可能会被多个 Handler 处理,在这个过程中,数据或者事件经流 ChannelPipeline,由 ChannelHandler 处理。在这个处理过程中,一个 ChannelHandler 接收数据 后处理完成后交给下一个 ChannelHandler,或者什么都不做直接交给下一个 ChannelHandler。
6、Channel的生命周期
ChannelUnregistered Channel已经被创建,但是还未注册到EventLoop;
ChannelRegistered Channel已经被注册到EventLoop;
ChannelActive Channel处于活跃状态(已经连接到远程节点),可以进行接收和发送数据
ChannelInactive Channel没有连接到远程节点
三.Netty核心组件关系
04.Netty简单通信
服务器端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.
NioServerSocketChannel;
public class Server {
public static void main(String[] args) {
int port = 9898;
new Server().bind(port);
}
public void bind(int port) {
/** * interface EventLoopGroup extends EventExecutorGroup extends
ScheduledExecutorService extends
ExecutorService
* 配置服务端的 NIO 线程池,用于网络事件处理,
实质上他们就是 Reactor 线程组
* bossGroup 用于服务端接受客户端连接,
workerGroup 用于进行 SocketChannel 网络 读写*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/** ServerBootstrap 是 Netty 用于启动 NIO
服务端的辅助启动类,用于降低开发 难度 * */
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
/**服务器启动辅助类配置完成后,调用 bind
方法绑定监听端口,调用 sync 方法同步等待绑
定操作完成*/
ChannelFuture f = b.bind(port).sync();
System.out.println("服务器开始监听端口,
等待客户端连接.........");
/**下面会进行阻塞,等待服务器连接关闭之后
main 方法退出,程序结束*/
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/**优雅退出,释放线程池资源*/
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>
{
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new ServerHandler());
}
}
}
服务器处理类:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用于对网络事件进行读写
操作*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
/** * 收到客户端消息,自动触发
* @param ctx
* @param msg
* @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
/** 将 msg 转为 Netty 的 ByteBuf 对象,类似 JDK 中的 java.nio.ByteBuffer,不
过 ButeBuf 功能更强,更灵活 */
ByteBuf buf = (ByteBuf) msg;
/**readableBytes:获取缓冲区可读字节数,
然后创建字节数组 * 从而避免了像 java.nio.ByteBuffer
时,只能盲目的创建特定大小的字节数组比如 1024 */
byte[] reg = new byte[buf.readableBytes()];
/*readBytes:将缓冲区字节数组复制到新建的 byte
数组中然后将字节数组转为字符串*/
buf.readBytes(reg);
String body = new String(reg, "UTF-8");
System.out.println(Thread.currentThread().getName() +",The server
receive order : " + body);
/**回复消息
* copiedBuffer:创建一个新的缓冲区,
内容为里面的参数
* 通过 ChannelHandlerContext 的 write 方法将消息异步发送给客户端 * */
String respMsg = "I am Server,
消息接收 success!";
ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
ctx.write(respByteBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
/**flush:将消息发送队列中的消息
写入到 SocketChannel 中发送给对方,为了频繁的唤醒
Selector 进行消息发送
* Netty 的 write 方法并不直接将消息写如 SocketChannel 中,调用 write 只是把待
发送的消息放到发送缓存数组中,
* 再通过调用 flush方法,将发送缓冲区的消息全部写入到 SocketChannel中 * */
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
/**当发生异常时,关闭 ChannelHandlerContext,
释放和它相关联的句柄等资源 */
ctx.close();
}
}
客户端:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.
NioSocketChannel;
public class Client {
/** * 使用 3 个线程模拟三个客户端
* @param args */
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(new MyThread()).start();
}
}
static class MyThread implements Runnable {
@Override
public void run() {
connect("127.0.0.1", 9898);
}
public void connect(String host, int port) {
/**配置客户端 NIO 线程组/池*/
EventLoopGroup group =
new NioEventLoopGroup();
try {
/**Bootstrap 与 ServerBootstrap
都继承(extends)于 AbstractBootstrap
* 创建客户端辅助启动类,并对其配置,与服务器稍微不同,
这里的 Channel 设置为
NioSocketChannel
* 然后为其添加 Handler,这里直接使用匿名内部类,
实现 initChannel 方法
* 作用是当创建 NioSocketChannel 成功后,
在进行初始化时,将它的
ChannelHandler设置到ChannelPipeline中,
* 用于处理网络I/O事件*/
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<
SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws
Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
/**connect:发起异步连接操作,
调用同步方法 sync 等待连接成功*/
ChannelFuture channelFuture = b.connect(host, port).sync();
System.out.println(Thread.currentThread().
getName() + ",客户端发起
异步连接..........");
/**等待客户端链路关闭*/
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
/**优雅退出,释放NIO线程组*/
group.shutdownGracefully();
}
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.logging.Logger;
/** 用于对网络事件进行读写操作 */
public class ClientHandler extends ChannelInboundHandlerAdapter {
// private static final Logger logger =
Logger.getLogger(TimeClientHandler.class.getName());
/** 当客户端和服务端 TCP 链路建立成功之后,
Netty 的 NIO 线程会调用 channelActive 方法 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String reqMsg = "我是客户端 " + Thread.currentThread().getName();
byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
/** * writeBytes:将指定的源数组的数据传输到缓冲区
* 调用 ChannelHandlerContext 的
writeAndFlush 方法将消息发送给服务器 */
reqByteBuf.writeBytes(reqMsgByte);
ctx.writeAndFlush(reqByteBuf);
}
/** * 当服务端返回应答消息时,channelRead
方法被调用,从 Netty 的 ByteBuf 中
读取并打印应 答消息*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(Thread.currentThread().getName() + ",Server return
Message:" + body);
ctx.close();
}
/** * 当发生异常时,打印异常 日志,释放客户端资源 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
/**释放资源*/
//logger.warning("Unexpected exception from downstream : " +
cause.getMessage());
ctx.close();
}
}