前面我们讲过protobuf的使用,主流的编解码框架其实还有很多种:

①JBoss的Marshalling包

②google的Protobuf

③基于Protobuf的Kyro

④Apache的Thrift

JBoss Marshalling是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容;同时增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。

相比于传统的Java序列化机制,它的优点如下:

1) 可插拔的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制;

2) 可插拔的对象替换技术,不需要通过继承的方式;

3) 可插拔的预定义类缓存表,可以减小序列化的字节数组长度,提升常用类型的对象序列化性能;

4) 无须实现java.io.Serializable接口,即可实现Java序列化;

5) 通过缓存技术提升对象的序列化性能。

相比于protobuf和thrift的两种编解码框架,JBoss Marshalling更多是在JBoss内部使用,应用范围有限。


Protobuf全称Google Protocol Buffers,它由谷歌开源而来,在谷歌内部久经考验。它将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。

它的特点如下:

1) 结构化数据存储格式(XML,JSON等);

2) 高效的编解码性能;

3) 语言无关、平台无关、扩展性好;

4) 官方支持Java、C++和Python三种语言。

首先我们来看下为什么不使用XML,尽管XML的可读性和可扩展性非常好,也非常适合描述数据结构,但是XML解析的时间开销和XML为了可读性而牺牲的空间开销都非常大,因此不适合做高性能的通信协议。Protobuf使用二进制编码,在空间和性能上具有更大的优势。

Protobuf另一个比较吸引人的地方就是它的数据描述文件和代码生成机制,利用数据描述文件对数据结构进行说明的优点如下:

1) 文本化的数据结构描述语言,可以实现语言和平台无关,特别适合异构系统间的集成;

2) 通过标识字段的顺序,可以实现协议的前向兼容;

3) 自动代码生成,不需要手工编写同样数据结构的C++和Java版本;

4) 方便后续的管理和维护。相比于代码,结构化的文档更容易管理和维护。

Thrift源于Facebook,在2007年Facebook将Thrift作为一个开源项目提交给Apache基金会。对于当时的Facebook来说,创造Thrift是为了解决Facebook各系统间大数据量的传输通信以及系统之间语言环境不同需要跨平台的特性,因此Thrift可以支持多种程序语言,如C++、C#、Cocoa、Erlang、Haskell、Java、Ocami、Perl、PHP、Python、Ruby和Smalltalk。

在多种不同的语言之间通信,Thrift可以作为高性能的通信中间件使用,它支持数据(对象)序列化和多种类型的RPC服务。Thrift适用于静态的数据交换,需要先确定好它的数据结构,当数据结构发生变化时,必须重新编辑IDL文件,生成代码和编译,这一点跟其他IDL工具相比可以视为是Thrift的弱项。Thrift适用于搭建大型数据交换及存储的通用工具,对于大型系统中的内部数据传输,相对于JSON和XML在性能和传输大小上都有明显的优势。

Thrift主要由5部分组成:

1) 语言系统以及IDL编译器:负责由用户给定的IDL文件生成相应语言的接口代码;

2) TProtocol:RPC的协议层,可以选择多种不同的对象序列化方式,如JSON和Binary;

3) TTransport:RPC的传输层,同样可以选择不同的传输层实现,如socket、NIO、MemoryBuffer等;

4) TProcessor:作为协议层和用户提供的服务实现之间的纽带,负责调用服务实现的接口;

5) TServer:聚合TProtocol、TTransport和TProcessor等对象。

我们重点关注的是编解码框架,与之对应的就是TProtocol。由于Thrift的RPC服务调用和编解码框架绑定在一起,所以,通常我们使用Thrift的时候会采取RPC框架的方式。但是,它的TProtocol编解码框架还是可以以类库的方式独立使用的。

与Protobuf比较类似的是,Thrift通过IDL描述接口和数据结构定义,它支持8种Java基本类型、Map、Set和List,支持可选和必选定义,功能非常强大。因为可以定义数据结构中字段的顺序,所以它也可以支持协议的前向兼容。

Thrift支持三种比较典型的编解码方式:

1) 通用的二进制编解码;

2) 压缩二进制编解码;

3) 优化的可选字段压缩编解码。

由于支持二进制压缩编解码,Thrift的编解码性能表现也相当优异,远远超过Java序列化和RMI等。

这一节我们来讲解JBoss的Marshalling的使用。

和protobuf的使用不同,netty默认支持protobuf,所以为他预设了一个编解码器:ProtobufVarint32LengthFieldPrepender,ProtobufVarint32FrameDecoder。那如果采用jboss-marshalling进行编解码,则没有这么好的运气我们需要自己优先创建一个编解码的工厂类,供信息通讯时候对信息的编解码。

pom文件如下,需要新增两个jar包:jboss-marshalling,jboss-marshalling-serial。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.hust.netty</groupId>
<artifactId>netty</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>netty Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-river</artifactId>
<version>1.4.10.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.4.11.Final</version>
</dependency>
</dependencies>
<build>
<finalName>netty</finalName>
</build>
</project>

我们先来写一个工厂类,手动创建编解码器:

import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration; /**
* Marshalling工厂
*/
public final class MarshallingCodeCFactory { /**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
} /**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}

下面是服务端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* Created by Administrator on 2017/3/11.
*/
public class HelloWordServer {
private int port; public HelloWordServer(int port) {
this.port = port;
} public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer()); try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
} public static void main(String[] args) {
HelloWordServer server = new HelloWordServer(7788);
server.start();
}
}

服务端Initializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; /**
* Created by Administrator on 2017/3/11.
*/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 自己的逻辑Handler
pipeline.addLast("handler", new HelloWordServerHandler());
}
}

注意我们在这里加入了刚才我们写的编解码器哈,顺序没有关系。

服务端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; /**
* Created by Administrator on 2017/3/11.
*/
public class HelloWordServerHandler extends ChannelInboundHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
System.out.println(msg.toString());
}else{
ctx.writeAndFlush("received your msg");
Msg m = (Msg)msg;
System.out.println("client: "+m.getBody());
m.setBody("人生苦短,快用python");
ctx.writeAndFlush(m);
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}

接下来是客户端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader;
import java.io.InputStreamReader; /**
* Created by Administrator on 2017/3/11.
*/
public class HelloWorldClient {
private int port;
private String address; public HelloWorldClient(int port,String address) {
this.port = port;
this.address = address;
} public void start(){
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer()); try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
} } public static void main(String[] args) {
HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
client.start();
}
}

客户端Initializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; /**
* Created by Administrator on 2017/3/11.
*/
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 客户端的逻辑
pipeline.addLast("handler", new HelloWorldClientHandler());
}
}

同样这里也加入编解码器。

客户端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; /**
* Created by Administrator on 2017/3/11.
*/
public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
System.out.println(msg);
}else{
Msg m = (Msg)msg;
System.out.println("client: "+m.getBody());
}
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Msg msg = new Msg();
msg.setHeader((byte)0xa);
msg.setLength(34);
msg.setBody("放纵自己,你好兄弟"); ctx.writeAndFlush(msg);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client is close");
}
}

我们注意上面有一个Msg对象,这个就是我们自己定义的一个对象,用于网络传输用的:

import java.io.Serializable;

/**
* 自定义一个对象
*/
public class Msg implements Serializable {
private byte header;
private String body;
private long length;
private byte type; public byte getHeader() {
return header;
} public void setHeader(byte header) {
this.header = header;
} public String getBody() {
return body;
} public void setBody(String body) {
this.body = body;
} public long getLength() {
return length;
} public void setLength(long length) {
this.length = length;
} public byte getType() {
return type;
} public void setType(byte type) {
this.type = type;
}
}

下面我们运行客户端和服务端,可以看到消息已经发出去了:

04-28 19:49