逻辑就是在处理handler前加入一个处理符,然后

channelReadComplete这个事件进行处理。
同时注意客服端的配置:
 public void connect(String addr, int port, final String xml, final String key,final boolean flag) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(1024*1024))//这行配置比较重要
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new EsbClientHandler(xml, key,flag));
}
});
ChannelFuture f = b.connect(addr, port);
// 等待客户端关闭连接
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}

源码如下:

 public class EsbClientHandler extends ChannelHandlerAdapter {

     private static Logger logger = Logger.getLogger(EsbClientHandler.class);

     private ByteBuf byteMsg;

     private boolean flag;

     /**
* 临时客户端数据key
*/
private String key = "key"; public EsbClientHandler(String xml, String key, boolean flag) {
this.key = key;
this.flag = flag;
byte[] req = null;
try {
req = xml.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
byteMsg = Unpooled.buffer(req.length);
byteMsg.writeBytes(req);
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(byteMsg);
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
flag = true;
String xml = WebUtil.getXmlStr(body);
analysisXml(xml);
} catch (Exception e) {
e.printStackTrace();
} finally {
ReferenceCountUtil.release(msg);
} } @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
if (flag) {
ctx.close();
} else {
ctx.read();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 释放资源
ctx.close();
} /**
*
* 客户端解析服务端返回的xml数据,进行数据操作
*
* @param xml
*/
public void analysisXml(String xml) {
System.out.println("获取服务器接收报文:" + xml);
logger.info("开始解析xml:");
logger.info(xml);
NettyMap.setDataMap(key, xml);
}
}
05-26 07:24