好久没写博客了,最近打算花些时间把Netty的源码好好读一读,下面是本人在学习的过程中的一些笔记,不能确保自己思考的完全是正确的,如果有错误,欢迎大家指正。
由于本人的语文功底烂的很,通篇使用大白话来讲解0.0,有一些概念上的东西,博主可能不会明确的给出定义,建议使用过Netty的同学一起来研究。
好了,我们一起来看下吧。
Netty 是一款用于快速开发的高性能的网络应用程序的Java框架。说到Netty, 我们先对几种I/O模型进行一下比对:
那么伪异步IO是啥呢?
其实就是加入了线程池(ThreadPoolExecutor),对接入的客户端的Socket封装成task,实现了Runnable接口,然后投递到线程池中处理,这样就避免了BIO那种一个客户端连接一个IO线程的情况,防止资源耗尽和宕机。但是这种方式底层的通信依然采用了同步阻塞模型,无法从根本上解决问题。
那么AIO又是啥呢?
NIO2.0 引入了新的一步通道的概念,并提供了异步文件通道和异步套接字的实现。它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,属于真正意义上的异步非阻塞IO。
1、通过java.util.concurrent.Future 类来异步获取操作的结果。
2、在执行异步操作的时候传入一个CompletionHandler接口的实现类,作为操作完成的回调。
接口有以下两个方法。
/**
* Invoked when an operation has completed.
*
* @param result
* The result of the I/O operation.
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void completed(V result, A attachment); /**
* Invoked when an operation fails.
*
* @param exc
* The exception to indicate why the I/O operation failed
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
好的,下面也稍微回顾一下NIO,以及NIO涉及的几个关键组件:
- 缓冲区 Buffer
- 通道 Channel
- 多路复用器 Selector
- Buffer : 看什么都不如看官方文档来的更准确,下面是官方Buffer javadoc内容,我们来看下:
里面讲述了,buffer抽象类 是一个数据容器,除了内容,还有一些属性,capacity、limit、position。
capacity 是容器的容量,这个值一旦被创建,就无法修改。 limit 是 不应该被读或写的第一个元素的位置。 position 是指下一个将会被读或写的位置,这个值一定小于等于limit。
另外javadoc中还提到了mark和reset, 其中mark其实就是打一个标记,把当前的position赋给mark。 那么 reset 的 描述是这样的 把当前的position 改成之前mark的位置。
ok,由上面的文档可以得出下面的顺序 0 <= mark <= position <= limit <= capacity
其实Buffer中还有一个非常重要的方法必须要说一下,那就是 flip() ,看下javadoc
这个其实就是把 当前的limit = position, position = 0, 当然如果之前有mark也会失效,设置成-1, 当你往buffer中写了数据的时候,只有执行flip()方法, 才可以正确的读取数据, doc中还指出这个方法经常和compact()方法连着用。同样,贴出javadoc:
相当于什么呢,就相当于是清理掉已经读取过得数据,比如 position = 5 , limit = 10,前5个数据经读取过了,那么将新建一个buffer,将当前position到limit的数据拷贝到一个新的Buffer中,那么新的buffer的postion = limit-postion, limit = capacity, 好了,看源码是这样的,接下来就是验证一下了:
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put("helloworld".getBytes());
System.out.println(buffer.position() + ":" + buffer.limit());
buffer.flip();
System.out.println(buffer.position() + ":" + buffer.limit());
byte[] bytes = new byte[buffer.limit() + 1];
for(int i=0; i<6; i++) {
bytes[i] = buffer.get();
}
System.out.println(new String(bytes));
System.out.println(buffer.position() + ":" + buffer.limit());
System.out.println(buffer);
buffer.compact();
System.out.println(buffer.position() + ":" + buffer.limit());
System.out.println(buffer);
测试结果如下:
10:10
0:10
hellow
6:10
java.nio.HeapByteBuffer[pos=6 lim=10 cap=10]
4:10
java.nio.HeapByteBuffer[pos=4 lim=10 cap=10]
好了,Buffer的源码看到这里也算是差不多了。
2、Channel
Channel是一个通道, 它就像自来水管一样,网络数据通过Channel读取与写入,通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutStream的子类),而通道可以用于读、写或者二者同时进行, 属于全双工。
这里我们也来看下源码吧,就看ServerSocketChannel
提供了几个比较重要的api:
public static ServerSocketChannel open() throws IOException; // 通过该方法创建一个Channel
看下javadoc , 明确说明了 新创建的channel是没有任何绑定的,在进行accepted之前需要绑定一个地址。
public final ServerSocketChannel bind(SocketAddress local);// 绑定一个端口号
public abstract SocketChannel accept() throws IOException; // 接收新的客户端
3、Selector 多路复用器 ,简单来说呢,Selector 会不断的轮询注册在其上的Channel, 如果某个Channel上面发生了读写等事件,这个Channel就会处理就绪状态, 会被Selector轮询出来,然后拿到SelectionKey Set集合,从而获取到每一个就绪状态的Channel,进行后续的I/O操作。
由于JDK使用了epoll() 代替传统的select实现,所以没有最大句柄的1024/2048的限制, 只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。NB
channel将会通过一个SelectionKey注册到一个selector上,一个selector 通过 open方法去创建。
这一段着重指出,selectionKey集合只能通过 set 集合的 remove() 方法 或者 一个迭代器的 remove() 方法来移除。其余的方法都不可以修改 selected-key 。
好了,看到这里,有些朋友可能似懂非懂,但是看下下面的单元测试一下子就懂了。
这段代码实现了Nio的服务器端,接收到客户端消息后,然后通知所有的客户端。
private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap(); public static void main(String[] args) { try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 创建一个Channel
serverSocketChannel.configureBlocking(false); // 设置为非阻塞
serverSocketChannel.bind(new InetSocketAddress(8899)); // 绑定端口 Selector selector = Selector.open(); // 创建一个Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将Channel注册到Selector上,设置selectionKey 为 accept, 准备接收新的客户端连接 while (true) { // 死循环不断轮询,查看 是否有准备就绪的channel
selector.select(); // 阻塞等到就绪的channel
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取到就绪的selectionKeys集合
selectionKeys.forEach(value -> {
try {
if(value.isAcceptable()) { // 接收新的客户端事件
ServerSocketChannel channel = (ServerSocketChannel)value.channel(); // 获取channel
SocketChannel clientChannel = channel.accept(); // 获取客户端的 socketChannel
clientChannel.configureBlocking(false); // 设置为非阻塞
String clientId = UUID.randomUUID().toString();
System.out.println("客户端接入" + clientId);
clientMap.put(clientId, clientChannel);
clientChannel.register(selector, SelectionKey.OP_READ); // 这里重点说下, 当接收到新的客户端后,接下来就是准备接收数据,所以这里就是注册的是Read事件
// 并且这里注册到selector上的是客户端对应的SocketChannel, 而不是ServerSocketChannel,
// 因为ServerScoketChannel只负责接收新的客户端
} else if(value.isReadable()) { // 接收到read事件
SocketChannel clientChannel = (SocketChannel)value.channel(); // 所以这里是SocketChannel
ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配内存
int count = clientChannel.read(buffer); // 写channel中的数据到Buffer中
if (count > 0) {
buffer.flip(); // 写完之后,一定要执行flip。转化成读
Charset charset = Charset.forName("utf-8");
String receiveMsg = String.valueOf(charset.decode(buffer).array());
System.out.println("receiveMsg = " +receiveMsg);
Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator();
String sendClient = null;
while (it.hasNext()) {
Map.Entry<String, SocketChannel> next = it.next();
if(next.getValue() == clientChannel) {
sendClient = next.getKey();
break;
}
}
it = clientMap.entrySet().iterator();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
while (it.hasNext()) {
SocketChannel socketChannel = it.next().getValue();
writeBuffer.clear();
writeBuffer.put(("sendClient:" + sendClient + "发送了消息").getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
selectionKeys.clear(); // 每次处理完这一批selectionKeys,一定要清空掉集合。
} } catch (IOException e) {
e.printStackTrace();
} finally {
}
}
ok, 上面是我自己的一些理解,如果有问题欢迎大家指正。下一篇,我们将开始学习Netty的源码。