管道流:
Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
package base.nio.threaddemo; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe; /**
* @program: Lear-Java
* @description:
* @author: Mr.Dai
* @create: 2018-10-05 20:43
**/
public class ThreadSend { private Pipe pipe; private void init() throws Exception {
this.pipe = Pipe.open();
} class SendInner1 extends Thread { @Override
public void run() {
// 单向流 发送数据
try {
Pipe.SinkChannel sink = pipe.sink();
sink.configureBlocking(false); while (true) {
if (sink.isOpen()) {
sink.write(ByteBuffer.wrap("abcd".getBytes()));
}
Thread.sleep(1000);
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
} class ReverInner extends Thread {
@Override
public void run() {
try {
// 单向流 拿到数据
Pipe.SourceChannel source = pipe.source(); source.configureBlocking(false); while (true) {
if (source.isOpen()) {
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.clear();
source.read(buffer);
// 这里必须去掉 trim
if(new String(buffer.array()).trim().equals("")){
continue;
}
System.out.println(new String(buffer.array()).trim());
}
Thread.sleep(1000);
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
} public static void main(String[] args) throws Exception {
ThreadSend send = new ThreadSend(); send.init(); SendInner1 sendI = send.new SendInner1(); ReverInner revI = send.new ReverInner(); sendI.start();
revI.start();
} }
套接字通道流
非阻塞模式
ServerSocketChannel可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。如:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false); 6 while(true){
SocketChannel socketChannel =
serverSocketChannel.accept(); if(socketChannel != null){
//do something with socketChannel...
}
}
server:
package base.nio.chatdemo; import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set; /**
* @program: Lear-Java
* @description: Nio 聊天服务端
* @author: Mr.Dai
* @create: 2018-10-05 16:31
**/
public class ChatServer { /**
* 通道管理器
*/
private Selector selector; private void initServer(int port) throws Exception{ ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel .socket().bind(new InetSocketAddress(port));
// 配置非阻塞
serverChannel .configureBlocking(false); this.selector=Selector.open(); /**
* 将通道管理器和该通道绑定,并为该通道注册selectionKey.OP_ACCEPT事件
* 注册该事件后,当事件到达的时候,selector.select()会返回,
* 如果事件没有到达selector.select()会一直阻塞
* selector.selectNow() 立即返回 无论是否准备好 可能返回0
*/
serverChannel .register(this.selector, SelectionKey.OP_ACCEPT); } /**
* 采用轮训的方式监听selector上是否有需要处理的事件,如果有,进行处理
*/
public void listen() throws Exception {
System.out.println("start------------------->");
while (true){
// 在没有注册事件来到时 将会一直阻塞
selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator(); while (iterator.hasNext()){
SelectionKey key = iterator.next();
// 移除当前阻塞队列
iterator.remove();
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept();
channel.configureBlocking(false);
// 服务端发送数据
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
// 在客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限
channel.register(this.selector,SelectionKey.OP_READ); }else if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer); String msg = new String(buffer.array()).trim(); System.out.println("客户端发送过来的讯息:"+msg);
// 在读取后 将柱塞队列数据 改变监听为Accept
ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
channel.write(outBuffer);
}
}
} } public static void main(String[] args) throws Exception{
ChatServer server = new ChatServer();
server.initServer(8989);
server.listen();
} }
clien:
package base.nio.chatdemo; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator; /**
* @program: Lear-Java
* @description: nio 聊天客户端
* @author: Mr.Dai
* @create: 2018-10-05 16:31
**/
public class ChatClient { /**
* 提供柱阻塞队列 管理器
*/
private Selector selector; private void ininCliect(String ip,int port) throws Exception{ SocketChannel channel = SocketChannel.open(); channel .connect(new InetSocketAddress(ip,port)); this.selector=Selector.open(); channel .configureBlocking(false); channel .register(this.selector, SelectionKey.OP_CONNECT); } public void listen() throws Exception { while (true){ selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while (ite.hasNext()){
SelectionKey key = ite .next();
ite .remove();
if(key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
// 是否准备好连接
if(channel.isConnectionPending()){
channel.finishConnect();
}
channel.configureBlocking(false);
// 向server 发送数据
channel.write(ByteBuffer.wrap("向server 发送数据".getBytes())); channel.register(selector,SelectionKey.OP_READ); }else if(key.isReadable()){
m1(key);
}
}
}
} private void m1(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
System.out.println("服务端的消息为:"+new String(buffer.array())); ByteBuffer outBuffer = ByteBuffer.wrap(new String("aaa").getBytes());
channel.write(outBuffer);
} public static void main(String[] args) throws Exception {
ChatClient client = new ChatClient(); client.ininCliect("127.0.0.1",8989);
client.listen();
} }