概述
在《简易RPC框架:需求与设计》这篇文章中已经给出了协议的具体细节,协议类型为二进制协议,如下:
------------------------------------------------------------------------
| magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) |
------------------------------------------------------------------------
| status (1byte) | id (8bytes) | body length (4bytes) |
------------------------------------------------------------------------
| |
| body ($body_length bytes) |
| |
------------------------------------------------------------------------
协议的解码我们称为 decode,编码我们成为 encode,下文我们将直接使用 decode 和 encode 术语。
decode 的本质就是讲接收到的一串二进制报文,转化为具体的消息对象,在 Java 中,就是将这串二进制报文所包含的信息,用某种类型的对象存储起来。
encode 则是将存储了信息的对象,转化为具有相同含义的一串二进制报文,然后网络收发模块再将报文发出去。
无论是 rpc 客户端还是服务端,都需要有一个 decode 和 encode 的逻辑。
消息类型
rpc 客户端与服务端之间的通信,需要通过发送不同类型的消息来实现,例如:client 向 server 端发送的消息,可能是请求消息,可能是心跳消息,可能是认证消息,而 server 向 client 发送的消息,一般就是响应消息。
利用 Java 中的枚举类型,可以将消息类型进行如下定义:
/**
* 消息类型
*
* @author beanlam
* @version 1.0
*/
public enum MessageType {
REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE(
(byte) 0x04), UNKNOWN((byte) 0xFF);
private byte code;
MessageType(byte code) {
this.code = code;
}
public static MessageType valueOf(byte code) {
for (MessageType instance : values()) {
if (instance.code == code) {
return instance;
}
}
return UNKNOWN;
}
public byte getCode() {
return code;
}
}
在这个类中设计了 valueOf 方法,方便进行具体的 byte 字节与具体的消息枚举类型之间的映射和转换。
调用状态设计
client 主动发起的一次 rpc 调用,要么成功,要么失败,server 端有责任告知 client 此次调用的结果,client 也有责任去感知调用失败的原因,因为不一定是 server 端造成的失败,可能是因为 client 端在对消息进行预处理的时候,例如序列化,就已经出错了,这种错误也应该作为一次调用的调用结果返回给 client 调用者。因此引入一个调用状态,与消息类型一样,它也借助了 Java 语言里的枚举类型来实现,并实现了方便的 valueOf 方法:
/**
* 调用状态
*
* @author beanlam
* @version 1.0
*/
public enum InvocationStatus {
OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT(
(byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE(
(byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR(
(byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED(
(byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY(
(byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR(
(byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF);
private byte code;
InvocationStatus(byte code) {
this.code = code;
}
public static InvocationStatus valueOf(byte code) {
for (InvocationStatus instance : values()) {
if (code == instance.code) {
return instance;
}
}
return UNKNOWN;
}
public byte getCode() {
return code;
}
}
消息实体设计
我们将 client 往 server 端发送的统一称为 rpc 请求消息,一个请求对应着一个响应,因此在 client 和 server 端间流动的信息大体上其实就只有两种,即要么是请求,要么是响应。我们将会定义两个类,分别是 RpcRequest 和 RpcResponse 来代表请求消息和响应消息。
另外由于无论是请求消息还是响应消息,它们都有一些共同的属性,例如说“调用上下文ID”,或者消息类型。因此会再定义一个 RpcMessage 类,作为父类。
RpcMessage
/**
* rpc消息
*
* @author beanlam
* @version 1.0
*/
public class RpcMessage {
private MessageType type;
private long contextId;
private Object data;
public long getContextId() {
return this.contextId;
}
public void setContextId(long id) {
this.contextId = id;
}
public Object getData() {
return this.data;
}
public void setData(Object data) {
this.data = data;
}
public void setType(byte code) {
this.type = MessageType.valueOf(code);
}
public MessageType getType() {
return this.type;
}
public void setType(MessageType type) {
this.type = type;
}
@Override
public String toString() {
return "[messageType=" + type.name() + ", contextId=" + contextId + ", data="
+ data + "]";
}
}
RpcRequest
import java.util.concurrent.atomic.AtomicLong;
/**
* rpc请求消息
*
* @author beanlam
* @version 1.0
*/
public class RpcRequest extends RpcMessage {
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
public RpcRequest() {
this(ID_GENERATOR.incrementAndGet());
}
public RpcRequest(long contextId) {
setContextId(contextId);
setType(MessageType.REQUEST);
}
}
RpcResponse
/**
*
* rpc响应消息
*
* @author beanlam
* @version 1.0
*/
public class RpcResponse extends RpcMessage {
private InvocationStatus status = InvocationStatus.OK;
public RpcResponse(long contextId) {
setContextId(contextId);
setType(MessageType.RESPONSE);
}
public InvocationStatus getStatus() {
return this.status;
}
public void setStatus(InvocationStatus status) {
this.status = status;
}
@Override
public String toString() {
return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]";
}
}
netty 编解码介绍
netty 是一个 NIO 框架,应该这么说,netty 是一个有良好设计思想的 NIO 框架。一个 NIO 框架必备的要素就是 reactor 线程模型,目前有一些比较优秀而且开源的小型 NIO 框架,例如分库分表中间件 mycat 实现的一个简易 NIO 框架,可以在这里看到。
netty 的主要特点有:微内核设计、责任链模式的业务逻辑处理、内存和资源泄露的检测等。其中编解码在 netty 中,都被设计成责任链上的一个一个 Handler。
decode 对于 netty 来说,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。
借助 netty 来实现协议编解码,实际上就是去在这两个handler里面实现编解码的逻辑。
decode
在实现 decode 逻辑时需要注意的一个问题是,由于二进制报文是在网络上发送的,因此一个完整的报文可能经过多个分组来发送的,什么意思呢,就是当有报文进来后,要确认报文是否完整,decode逻辑代码不能假设收到的报文就是一个完整报文,一般称这为“TCP半包问题”。同样,报文是连着报文发送的,意味着decode代码逻辑还要负责在一长串二进制序列中,分割出一个一个独立的报文,这称之为“TCP粘包问题”。
netty 本身有提供一些方便的 decoder handler 来处理 TCP 半包和粘包的问题。不过一般情况下我们不会直接去用它,因为我们的协议比较简单,自己在代码里处理一下就可以了。
完整的 decode 代码逻辑如下所示:
import cn.com.agree.ats.rpc.message.*;
import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory;
import cn.com.agree.ats.util.logfacade.IPuppetLogger;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 协议解码器
*
* @author beanlam
* @version 1.0
*/
public class ProtocolDecoder extends ByteToMessageDecoder {
private static final IPuppetLogger logger = AbstractPuppetLoggerFactory
.getInstance(ProtocolDecoder.class);
private boolean magicChecked = false;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list)
throws Exception {
if (!magicChecked) {
if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) {
return;
}
magicChecked = true;
if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) {
logger.warn(
"illegal data received without correct magic number, channel will be close");
ctx.close();
magicChecked = false; //this line of code makes no any sense, but it's good for a warning
return;
}
}
if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {
return;
}
int bodyLength = in
.getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET);
if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {
return;
}
magicChecked = false;// so far the whole packet was received
in.readShort(); // skip the magic
in.readByte(); // dont care about the protocol version so far
byte type = in.readByte();
byte status = in.readByte();
long contextId = in.readLong();
byte[] body = new byte[in.readInt()];
in.readBytes(body);
RpcMessage message = null;
MessageType messageType = MessageType.valueOf(type);
if (messageType == MessageType.RESPONSE) {
message = new RpcResponse(contextId);
((RpcResponse) message).setStatus(InvocationStatus.valueOf(status));
} else {
message = new RpcRequest(contextId);
}
message.setType(messageType);
message.setData(body);
list.add(message);
}
}
可以看到,我们解决半包问题的时候,是判断有没有收到我们期望收到的报文,如果没有,直接在 decode 方法里面 return,等有更多的报文被收到的时候,netty 会自动帮我们调起 decode 方法。而我们解决粘包问题的思路也很清晰,那就是一次只处理一个报文,不去动后面的报文内容。
还需要注意的是,在 netty 中,对于 ByteBuf 的 get 是不会消费掉报文的,而 read 是会消费掉报文的。当不确定报文是否收完整的时候,我们都是用 get开头的方法去试探性地验证报文是否接收完全,当确定报文接收完全后,我们才用 read 开头的方法去消费这段报文。
encode
直接贴代码,参考前文提到的协议格式阅读以下代码:
/**
*
* 协议编码器
*
* @author beanlam
* @version 1.0
*/
public class ProtocolEncoder extends MessageToByteEncoder<RpcMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)
throws Exception {
byte status;
byte[] data = (byte[]) rpcMessage.getData();
if (rpcMessage instanceof RpcRequest) {
RpcRequest request = (RpcRequest) rpcMessage;
status = InvocationStatus.OK.getCode();
} else {
RpcResponse response = (RpcResponse) rpcMessage;
status = response.getStatus().getCode();
}
out.writeShort(ProtocolMetaData.MAGIC);
out.writeByte(ProtocolMetaData.VERSION);
out.writeByte(rpcMessage.getType().getCode());
out.writeByte(status);
out.writeLong(rpcMessage.getContextId());
out.writeInt(data.length);
out.writeBytes(data);
}
}