官方api:http://netty.io/4.1/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html
package com.eshore.ismp.hbinterface.sps; import java.nio.charset.Charset; import org.apache.log4j.Logger;
import org.springframework.context.support.ClassPathXmlApplicationContext; import com.eshore.ismp.hbinterface.service.BizCommonService; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; public class SpsServer4 {
private static final Logger logger = Logger.getLogger(SpsServer4.class);
private static int PORT = 10002;
/**用于分配处理业务线程的线程组个数 */
protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认
/** 业务出现线程大小*/
protected static final int BIZTHREADSIZE = 4;
/*
* NioEventLoopGroup实际上就是个线程池,
* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
* 每一个NioEventLoop负责处理m个Channel,
* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); protected static void run(final BizCommonService bizCommonService) throws Exception {
//String PORTs=ConfigLoadUtil.getValue("toSpsServerPort");
//PORT=Integer.parseInt(PORTs);
//logger.info("PORT IS:"+PORT);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/* pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); */
pipeline.addLast(new MessageDecoder(Integer.MAX_VALUE,18,4,-22,0));
pipeline.addLast("decoder", new StringDecoder(Charset.forName("GBK")));
pipeline.addLast("encoder", new StringEncoder(Charset.forName("GBK")));
pipeline.addLast(new SpsServerHandler(bizCommonService));
}
}); b.bind(PORT).sync();
logger.info("TCP服务器已启动");
} protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} public static void main(String[] args) throws Exception {
try{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext.xml" });
context.start();
BizCommonService bizCommonService = (BizCommonService) context.getBean("bizCommonService");
SpsServer4.run(bizCommonService);
}catch(Exception e){
logger.error("start sps interface server error:",e);
System.exit(-1);
}
}
}
MessageDecoder:
package com.eshore.ismp.hbinterface.sps; import java.io.UnsupportedEncodingException;
import java.nio.ByteOrder; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; public class MessageDecoder extends LengthFieldBasedFrameDecoder{
private final Logger logger = LoggerFactory.getLogger(this.getClass());
public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment,
int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
} @Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception{
return super.decode(ctx, in);
}
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order){
long frameLength;
switch (length) {
case 1:
frameLength = buf.getUnsignedByte(offset);
break;
case 2:
frameLength = buf.getUnsignedShort(offset);
break;
case 3:
frameLength = buf.getUnsignedMedium(offset);
break;
case 4:
//frameLength = buf.getUnsignedInt(offset);
frameLength=buf.readableBytes();
logger.info("==="+frameLength);
byte[] cc=new byte[buf.readableBytes()];
ByteBuf tmp=buf.copy();
tmp.readBytes(cc);
try {
String ss=new String(cc,"GBK");
frameLength=Integer.parseInt(ss.substring(offset,offset+length));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
break;
case 8:
frameLength = buf.getLong(offset);
break;
default:
throw new DecoderException(
"unsupported lengthFieldLength: "
+ length + " (expected: 1, 2, 3, 4, or 8)");
}
return frameLength;
} }
其中,报文长度为6位:17-22
参考 https://blog.csdn.net/bestone0213/article/details/47108419