客户端代码:

/**
*
*/
package com.bobohe.nio; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset; /**
* 实现TCP/IP+NIO 方式的系统间通讯的代码,客户端:
*
* @author solo
*/
public class NioClient { /**
* create solo
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
String host = "127.0.0.1";
int port = 9527; Selector selector = Selector.open(); SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress(host, port)); clientChannel.register(selector, SelectionKey.OP_CONNECT); BufferedReader systemIn = new BufferedReader(new InputStreamReader(System.in));
while (true) {
if (clientChannel.isConnected()) {
String command = systemIn.readLine();
clientChannel.write(Charset.forName("UTF-8").encode(command)); if (command == null || "quit".equalsIgnoreCase(command)) {
System.out.println("Client quit!"); systemIn.close();
clientChannel.close();
selector.close();
System.exit(0);
} } //最长阻塞10s
int nKeys = selector.select(10 * 1000);
if (nKeys > 0) {
for (SelectionKey selectionKey : selector.selectedKeys()) {
if (selectionKey.isConnectable()) {
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.finishConnect();
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
StringBuilder sb = new StringBuilder(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int readBytes = 0;
int ret = 0;
while ((ret = socketChannel.read(byteBuffer)) > 0) {
readBytes += ret;
byteBuffer.flip(); sb.append(Charset.forName("UTF-8").decode(byteBuffer).toString()); byteBuffer.clear();
} if (readBytes == 0) {
/*
* handle Exception
*/
System.err.println("handle opposite close Exception");
socketChannel.close();
}
} catch (IOException e) {
/*
* handle Exception
*/
System.err.println("handle read Exception");
socketChannel.close();
} finally {
byteBuffer.clear();
} String message = sb.toString();
System.out.println(message);
}
}
selector.selectedKeys().clear(); } else {
/*
* handle Exception
* 三秒没有响应则打印错误信息
*/
System.err.println("handle select timeout Exception");
clientChannel.close();
} }
} }

服务端代码:

/**
*
*/ package com.bobohe.nio; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset; /**
* 实现TCP/IP+NIO 方式的系统间通讯的代码,服务器端: SocketChannel和ServerSocketChannel两个关键的类,网络IO
* 的操作则改为通过ByteBuffer来实现。
*
* @author solo
*/
public class NioServer { /**
* create by solo
*
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int port = 9527;
System.out.println("Server listen on port: " + port); Selector selector = Selector.open(); ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int nKeys = selector.select();
if (nKeys > 0) {
for (SelectionKey selectionKey : selector.selectedKeys()) {
if (selectionKey.isAcceptable()) {
ServerSocketChannel tempServerChannel = (ServerSocketChannel) selectionKey
.channel();
SocketChannel socketChannel = tempServerChannel
.accept();
if (socketChannel == null) {
continue;
} socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// try { Thread.sleep(5000); } catch
// (InterruptedException e) { e.printStackTrace(); }
SocketChannel socketChannel = (SocketChannel) selectionKey
.channel(); StringBuilder sb = new StringBuilder();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
int readBytes = 0;
int ret = 0;
while ((ret = socketChannel.read(byteBuffer)) > 0) {
readBytes += ret;
byteBuffer.flip(); sb.append(Charset.forName("UTF-8")
.decode(byteBuffer).toString()); byteBuffer.clear();
} if (readBytes == 0) {
/*
* handle Exception
*/
System.err
.println("handle opposite close Exception");
socketChannel.close();
} String message = sb.toString();
System.out.println("client: "
+ message);
if ("quit".equalsIgnoreCase(message.toString()
.trim())) {
System.out.println("Client has been quit!"); socketChannel.close();
} else if ("serverquit".equalsIgnoreCase(message
.trim())) {
System.out.println("Server has been shutdown!"); socketChannel.close();
serverChannel.close();
selector.close();
System.exit(0);
} else {
socketChannel.write(Charset.forName("UTF-8").encode("Server Handler Done!"));
}
} catch (IOException e) {
/*
* handle Exception
*/
System.err.println("handle read Exception");
socketChannel.close();
} finally {
byteBuffer.clear();
} }
}
selector.selectedKeys().clear(); }
}
} }
05-11 14:58