选择器 Selector 是 I/O 多路复用的核心组件,它可以监控实现了 SelectableChannel 的通道的就绪情况。有了多路复用(multiplexing) I/O 模型,使得单线程的 Java 程序在极端情况下能够处理数万个连接,极大提高了程序的并发数。
1. 多路复用 I/O 模型
I/O 多路复用模型是操作系统提供给应用程序的一种进行 I/O 操作的模型。应用程序通过 select/poll 系统调用监控多个 I/O 设备,一旦某个或者多个 I/O 设备的处于就绪状态(例如:可读)则返回,应用程序随后可对就绪的设备进行操作。
大致流程如下:
1)应用程序向内核发起 select 系统调用,该调用会阻塞应用程序。
2)内核等待数据到达。数据可能由 DMA 复制到内核缓冲区,也有可能是 CPU 进行复制。
3)数据准备完毕,select 调用返回。select 返回的是一个集和,可能有多个连接都已经就绪。
4)应用程序发起 read 系统调用。
5)操作系统将数据有内核缓冲区复制到用户缓冲区。
6)read 调用返回。
I/O 多路复用模型本质上是一种阻塞 I/O,进行读操作的 read 系统调用是阻塞的,select 的时候也是阻塞的。不过 I/O 多路复用模型的优势在于阻塞时可以等待多路 I/O 就绪,然后一并处理。与多线程处理多路 I/O 相比,它是单线程的,没有线程切换的开销,单位时间内能够处理多的连接数。
2. 选择器与通道关系
在 Java 中,通道 Channel 可以表示 I/O 连接,而选择器可以监控某些 I/O 事件就绪的通道,选择通道中就绪的 I/O 事件。这里的通道必须是实现了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 没有实现该接口,所以不支持选择器。
3. 选择键 SelectionKey
选择器 Selector 监控通道时监控的是通道中的事件,选择键 SelectionKey 就代表着 I/O 事件。程序通过调用 Selector.select() 方法来选中选择器所监控的通道中的就绪的 I/O 事件的集合,然后遍历集合,对事件作出相应的处理。
选择键 SelectionKey 可以表示 4 种事件,这 4 种事件使用 int 类型的常量来表示。
1)SelectionKey.OP_ACCEPT 表示 accept 事件就绪。例如:对于 ServerSocketChannel 来说,该事件就绪表示可以调用 accept() 方法来获得与客户端连接的通道 SocketChannel。
2)SelectionKey.OP_CONNECT 表示客户端与服务端连接成功。
3)SelectionKey.OP_READ 表示通道中已经有了可读数据,可以调用 read() 方法从通道中读取数据。
4)SelectionKey.OP_WRITE 表示写事件就绪,可以调用 write() 方法往通道中写入数据。
不同的通道所能够支持的 I/O 事件不同,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,可以查看通道的 javadoc 文档,或者调用通道的 validOps() 方法来进行判断。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持读事件。
4. 选择器使用步骤
4.1 获取选择器
与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。
Selector selector = Selector.open(); // 获取一个选择器实例
4.2 获取可选择通道
能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口
socketChannel.configureBlocking(false); // 配置通道为非阻塞模式
4.3 将通道注册到选择器
通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。
通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 将套接字通过到注册到选择器,关注 read 和 write 事件
4.4 轮询 select 就绪事件
通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。
while (selector.select() > 0){ // 轮询,且返回时有就绪事件
Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合
.......
}
有 3 种方式可以 select 就绪事件:
1)select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
2)select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
3)selectNode() 不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。
4.5 处理就绪事件
每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。从一个 SelectionKey 对象可以得到:1)就绪事件的对应的通道;2)就绪的事件。通过这些信息,就可以很方便地进行 I/O 操作。
for(SelectionKey key : keys){
if(key.isWritable()){ // 可写事件
if("Bye".equals( (line = scanner.nextLine()) )){
socketChannel.shutdownOutput();
socketChannel.close();
break;
}
buf.put(line.getBytes());
buf.flip();
socketChannel.write(buf);
buf.compact();
}
}
keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。
需要注意的是,处理完 I/O 事件之后,需要清除选择键集和,避免下一轮循环的时候对同一事件重复处理。
5. 完整示例
下面给出一个完整的实例,实例中包含 TCP 客户端 TcpClient, UDP 客户端 UdpClient 和服务端 EchoServer。服务端 EchoServer 可以同时处理 UDP 请求和 TCP 请求,用户可以在客户端控制台输入内容,按回车发送给服务端,服务端打印客户端发送过来的内容。完整代码:https://github.com/Robothy/java-experiments/tree/main/nio/Selector
5.1 服务端
public class EchoServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open(); // 获取选择器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道
serverSocketChannel.configureBlocking(false); // 服务器通道配置为非阻塞模式
serverSocketChannel.bind(new InetSocketAddress(9090)); // 绑定 TCP 端口 9090
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT
DatagramChannel datagramChannel = DatagramChannel.open(); // 打开套接字通道
datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式
datagramChannel.bind(new InetSocketAddress(9090)); // 绑定 UDP 端口 9090
datagramChannel.register(selector, SelectionKey.OP_READ); // 将通道注册到选择器 selector 中,注册事件为读取数据
ByteBuffer buf = ByteBuffer.allocate(1024); // 分配一个 1024 字节的堆字节缓冲区
while (selector.select() > 0){ // 轮询已经就绪的注册的通道的 I/O 事件
Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪的 I/O 事件,即选择器键集合
for (SelectionKey key : keys){ // 遍历选择键,处理就绪事件
if(key.isAcceptable()){ // 选择键的事件的是 I/O 连接事件
SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道
socketChannel.configureBlocking(false); // 配置为套接字通道为非阻塞模式
socketChannel.register(selector, SelectionKey.OP_READ); // 将套接字通过到注册到选择器,关注 READ 事件
}else if(key.isReadable()){ // 选择键的事件是 READ
StringBuilder sb = new StringBuilder();
if(key.channel() instanceof DatagramChannel){ // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的
sb.append("UDP Client: ");
datagramChannel.receive(buf); // 最多读取 1024 字节,数据报多出的部分自动丢弃
buf.flip();
while(buf.position() < buf.limit()) {
sb.append((char)buf.get());
}
buf.clear();
}else{ // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的
sb.append("TCP Client: ");
ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道
int size;
while ( (size = channel.read(buf))>0){
buf.flip();
while (buf.position() < buf.limit()) {
sb.append((char)buf.get());
}
buf.clear();
}
if (size == -1) {
sb.append("Exit");
channel.close();
}
}
System.out.println(sb);
}
}
keys.clear(); // 将选择键清空,防止下次循环时被重复处理
}
}
}
5.2 TCP 客户端
public class TcpClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_WRITE);
Scanner scanner = new Scanner(System.in);
String line;
ByteBuffer buf = ByteBuffer.allocate(1024);
while (selector.select() > 0){
Set<SelectionKey> keys = selector.selectedKeys();
for(SelectionKey key : keys){
if(key.isWritable()){
if("Bye".equals( (line = scanner.nextLine()) )){
socketChannel.shutdownOutput();
socketChannel.close();
break;
}
buf.put(line.getBytes());
buf.flip();
socketChannel.write(buf);
buf.compact();
}
}
keys.clear();
if(!socketChannel.isOpen()) break;
}
}
}
5.3 UDP 客户端
public class UdpClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open(); // 获取选择器
DatagramChannel datagramChannel = DatagramChannel.open(); // 打开一个数据报通道
datagramChannel.configureBlocking(false); // 配置通道为非阻塞模式
datagramChannel.register(selector, SelectionKey.OP_WRITE); // 将通道的写事件注册到选择器
ByteBuffer buff = ByteBuffer.allocate(1024); // 分配字节缓冲区
Scanner scanner = new Scanner(System.in); // 创建扫描器,扫描控制台输入流
InetSocketAddress server = new InetSocketAddress("localhost", 9090);
while (selector.select() > 0){ // 有就绪事件
Set<SelectionKey> keys = selector.selectedKeys(); // 获取选择键,即就绪的事件
for(SelectionKey key : keys){ // 遍历选择键
if(key.isWritable()){ // 如果当前选择键是读就绪
String line;
if("Bye".equals( line = scanner.nextLine() )) { // 从控制台获取 1 行输入,并检查输入的是不是 Bye
System.exit(0); // 正常退出
}
buff.put(line.getBytes()); // 放入缓冲区
buff.flip(); // 将缓冲区置为读状态
datagramChannel.send(buff, server); // 往 I/O 写数据
buff.compact(); // 压缩缓冲区,保留没发送完的数据
}
}
keys.clear();
}
}
}
6. 小结
Selector 作为多路复用 I/O 模型的核心组件,能够同时监控多路 I/O 通道。选择器在 select 就绪事件地时候会阻塞,在处理 I/O 事件的时候也会阻塞,它的优势在于在阻塞的时候可以等待多路 I/O 就绪,是一种异步阻塞 I/O 模型。与多线程处理多路 I/O 相比,多路复用模型只需要单个线程即可处理万级连接,没有线程切换的开销。