处理消息边界
为什么要处理边界
因为会存在半包和粘包的问题
1.客户端和服务端约定一个固定长度
-
优点:简单
-
缺点:可能造成浪费
2.客户端与服务端约定一个固定分割符
*缺点 效率低
3.先发送长度,再发送数据
TLV格式: type类型,length长度,Value数据,类型和长度已知的情况下,就可以方便获取消息大小
http1.1是TLV格式
http2.0是LTV格式
4.自动扩容解决消息边界问题
第一次read事件未能读完全部的输入,那么会产生第二个读事件,那么在第一次读的时候进行扩容,
并复制之前的内容至新的buffer中,
在第二个读事件触发以后使用扩容后的buffer,读取剩余的数据
buffer应当和各自的channel绑定,如何绑定,需要用到附件attachment,
attachment需要在注册时放到selectionKey中。
// 绑定附件
SelectionKey scKey = channel.register(selector,0,byteBuffer);
// 获取附件
scKey.attachment();
// 指定新的附件(覆盖附件)
scKey.attach(bytebuffer);
示例代码:
package com.ysf;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
public class BorderSelectorServer {
/**
* 读取到 '\n'时打印
* @param byteBuffer 缓冲字节
*/
public static void handle(ByteBuffer byteBuffer) {
byteBuffer.flip();
for (int i = 0; i < byteBuffer.limit(); i++) {
if (byteBuffer.get(i) == '\n') {
int length = i + 1 - byteBuffer.position();
ByteBuffer allocate = ByteBuffer.allocate(length);
for (int j=0;j<length;j++){
allocate.put(byteBuffer.get());
}
allocate.flip();
System.out.println(Charset.defaultCharset().decode(allocate));
}
}
byteBuffer.compact();
}
public static void main(String[] args) throws IOException {
// 声明一个选择器
Selector selector = Selector.open();
// 声明一个server
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(11027));
ssc.configureBlocking(false);
// 注册这个server到selector
SelectionKey sscKey = ssc.register(selector, 0, null);
// 添加sscKey关心的事件,因为是serverChannel,所以应当关心accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
// 声明一个buffer缓冲区和socketChannel绑定
ByteBuffer byteBuffer = ByteBuffer.allocate(16);
SelectionKey scKey = socketChannel.register(selector, 0, byteBuffer);
scKey.interestOps(SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 当客户端异常断开链接是需要处理IOException
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
int read = channel.read(byteBuffer);
if (read == -1) {
// 客户端close()了
key.cancel();
}else{
// 调用处理逻辑
handle(byteBuffer);
if (byteBuffer.position() == byteBuffer.limit()){
// buffer满了,需要扩容
ByteBuffer bufferEx = ByteBuffer.allocate(byteBuffer.capacity() * 2);
byteBuffer.flip();
bufferEx.put(byteBuffer);
key.attach(bufferEx);
}
}
}catch (IOException e){
// e.printStackTrace();
key.cancel();
}
}
}
}
}
}