Java NIO 简介及高并发网络编程实现

Java NIO

  • NIO(Non-blocking I/O,非阻塞 I/O)是 Java 在 JDK 1.4 中引入的一套新的 I/O API,旨在解决传统 I/O(即 BIO,阻塞 I/O)在高并发场景下的性能和扩展性不足的问题。

NIO 的核心特点

  1. 非阻塞 I/O:支持非阻塞模式,可以让线程不必一直等待 I/O 操作完成,从而提高系统资源利用率。
  2. 基于缓冲区(Buffer):数据的读写通过缓冲区进行,而不是直接通过流(Stream)。
  3. 选择器(Selector):通过一个线程管理多个通道(Channel),极大地提升了高并发场景下的扩展性和效率。
  4. 多路复用:通过 Selector 机制可以同时监控多个通道的状态(如连接就绪、读数据就绪等)。

BIO(阻塞 I/O)与 NIO(非阻塞 I/O) 对比


NIO 的核心组件

  1. Channel(通道)

    • 类似流(Stream),但 Channel 同时支持读和写。
    • 常见的 ChannelSocketChannelServerSocketChannelDatagramChannelFileChannel
  2. Buffer(缓冲区)

    • 数据读写通过 Buffer 进行。
    • 常见的缓冲区:ByteBufferCharBufferIntBuffer 等。
  3. Selector(选择器)

    • 核心组件,用于监听多个通道的事件,如连接就绪、读就绪、写就绪等。
    • 通过多路复用机制实现一个线程管理多个通道。
  4. SelectionKey

    • 表示通道和选择器的注册关系,包含通道的事件类型(如读、写、连接等)。

Java NIO 网络高并发编程示例

场景描述

  • 服务器端:监听客户端请求,接收数据并返回信息。
  • 客户端:连接服务器,发送数据并接收响应。

服务器端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class NioServer {
    public static void main(String[] args) {
        try {
            // 1. 创建 ServerSocketChannel 用于监听客户端连接
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress(8080));
            serverChannel.configureBlocking(false); // 设置为非阻塞模式

            // 2. 创建 Selector 并注册 ServerSocketChannel,监听 ACCEPT 事件
            Selector selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("NIO Server started on port 8080...");

            while (true) {
                // 3. 检查是否有事件发生,阻塞等待
                if (selector.select() == 0) {
                    continue; // 如果没有事件就绪,继续循环
                }

                // 4. 获取就绪的事件集合
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove(); // 移除当前处理的 key,避免重复处理

                    try {
                        // 5. 处理不同的事件
                        if (key.isAcceptable()) { // 客户端连接事件
                            handleAccept(key, selector);
                        } else if (key.isReadable()) { // 读取客户端数据事件
                            handleRead(key);
                        }
                    } catch (IOException e) {
                        System.err.println("Error handling client connection: " + e.getMessage());
                        key.cancel(); // 取消出错的键
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 处理 ACCEPT 事件
    private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept(); // 接受客户端连接
        clientChannel.configureBlocking(false); // 设置为非阻塞模式
        clientChannel.register(selector, SelectionKey.OP_READ); // 注册 READ 事件
        System.out.println("Client connected: " + clientChannel.getRemoteAddress());
    }

    // 处理 READ 事件
    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int bytesRead;

        try {
            bytesRead = clientChannel.read(buffer); // 从通道中读取数据
        } catch (SocketException e) {
            System.err.println("Connection reset by client: " + clientChannel.getRemoteAddress());
            clientChannel.close();
            key.cancel();
            return;
        }

        if (bytesRead > 0) {
            buffer.flip(); // 切换到读取模式
            String message = new String(buffer.array(), 0, buffer.limit());
            System.out.println("Received from client: " + message);

            // 回写数据到客户端
            buffer.clear();
            buffer.put(("Echo: " + message).getBytes());
            buffer.flip();
            clientChannel.write(buffer);
        } else if (bytesRead == -1) { // 客户端断开连接
            System.out.println("Client disconnected: " + clientChannel.getRemoteAddress());
            clientChannel.close();
            key.cancel(); // 取消注册的事件
        }
    }
}


客户端代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NioClient {

    public static void main(String[] args) throws IOException {
        // 1. 创建 SocketChannel 连接服务器
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080))) {
            // 等待连接完成
            while (!socketChannel.finishConnect()) {
                System.out.println("Connecting to server...");
            }
        }

        System.out.println("Connected to the server");

        // 2. 发送数据到服务器
        String message = "Hello, NIO Server!";
        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
        socketChannel.write(buffer);

        // 3. 接收服务器回写的数据
        buffer.clear();
        int bytesRead = socketChannel.read(buffer);
        if (bytesRead > 0) {
            buffer.flip();
            String response = new String(buffer.array(), 0, buffer.limit());
            System.out.println("Received from server: " + response);
        }

        socketChannel.close();
    }
}

运行结果

  1. 客户端输出

     Connected to the server
     Received from server: Echo: Hello, NIO S
     进程已结束,退出代码为 0
    
  2. 服务器端输出

     NIO Server started on port 8080...
     Client connected: /127.0.0.1:50104
     Received from client: Hello, NIO Server!
     Connection reset by client: /127.0.0.1:50104
    

NIO 高并发的关键点

  1. 非阻塞 I/O:通过 Selector 一个线程可以同时监听多个客户端连接,无需为每个连接创建一个线程,降低了线程开销。
  2. 多路复用Selector 提供多路复用机制,能同时监听多个事件(如连接就绪、读就绪等)。
  3. IO 操作优化:通过 ByteBuffer 进行 I/O 操作,避免了传统流的频繁数据拷贝,提高了读写效率。

NIO 的局限性和改进

  1. 局限性
    • NIO 的使用相对复杂,需要手动管理通道和缓冲区。
    • 在高并发场景下,Selector 的性能可能成为瓶颈。
  2. 改进方向
    • Netty:一个基于 NIO 的高性能网络框架,简化了 NIO 的使用,同时提供了更高的吞吐量和扩展性。
    • 异步 I/O(AIO):Java NIO 2.0(JDK 7 引入)提供了异步 I/O,进一步优化了线程资源利用。

适用场景和建议

  • 适用场景:高并发的网络应用,例如 Web 服务器、消息推送服务;I/O 密集型应用。

  • 使用建议:对于复杂的高性能网络应用,建议使用 Netty 等成熟框架,避免直接操作 NIO 的底层代码。

12-28 21:00