参考:http://blog.csdn.net/linuu/article/details/51371595
https://www.jianshu.com/p/a0a51fd79f62
netty默认是只能接收1024个字节,但是我们要传输大文件怎么办?
上代码:
改之后服务端:
package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.Charset;
import org.apache.log4j.Logger;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.eshore.ismp.hbinterface.service.BizCommonService;
import com.eshore.ismp.hbinterface.util.ConfigLoadUtil;
public class SpsServer {
private static final Logger logger = Logger.getLogger(SpsServer.class);
private static int PORT = 10001;
/**用于分配处理业务线程的线程组个数 */
protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认
/** 业务出现线程大小*/
protected static final int BIZTHREADSIZE = 4;
/*
* NioEventLoopGroup实际上就是个线程池,
* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
* 每一个NioEventLoop负责处理m个Channel,
* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
protected static void run(final BizCommonService bizCommonService) throws Exception {
String PORTs=ConfigLoadUtil.getValue("toSpsServerPort");
PORT=Integer.parseInt(PORTs);
logger.info("PORT IS:"+PORT);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/* pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); */
ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));
pipeline.addLast("decoder", new StringDecoder(Charset.forName("GBK")));
pipeline.addLast("encoder", new StringEncoder(Charset.forName("GBK")));
pipeline.addLast(new SpsServerHandler(bizCommonService));
}
});
b.bind(PORT).sync();
logger.info("TCP服务器已启动");
}
protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
try{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext.xml" });
context.start();
BizCommonService bizCommonService = (BizCommonService) context.getBean("bizCommonService");
SpsServer.run(bizCommonService);
}catch(Exception e){
logger.error("start sps interface server error:",e);
System.exit(-1);
}
}
}
改之后客户端:
package fourNoBlocking;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
/**
*
* 发送报文给客户端
*
*
* @date 2016年12月14日 上午11:56:27
* @since 1.0
*/
public class SendClient {
private static final String ENCODING = "GBK";
public static String send(String ip, int port, String sendStr, int timeout) {
long start = System.currentTimeMillis();
System.out.println(sendStr.length());
if (sendStr == null || "".equals(sendStr)) {
return "str is null";
}
Socket client = null;
OutputStream stream = null;
InputStream is = null;
try {
client = new Socket();
InetSocketAddress address = new InetSocketAddress(ip, port);
client.connect(address);
timeout = timeout >= 0 ? timeout : 3500;
client.setSoTimeout(timeout);
stream = client.getOutputStream();
is = client.getInputStream();
int len = 0;
len = sendStr.getBytes(ENCODING).length;
ByteBuffer buf = ByteBuffer.allocate(len);
byte[] bytes = sendStr.getBytes(ENCODING);
buf.put(bytes);
stream.write(buf.array(), 0, len);
stream.flush();
String res = "";
int i = 0;
byte[] b = new byte[6555];
while ((i = is.read(b)) != -1) {
res = new String(b, 0, i);
System.out.println(res);
break;
}
long end = System.currentTimeMillis();
return res;
} catch (Exception e) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
.append(System.currentTimeMillis());
return strBuilder.toString();
} finally {
if (client != null) {
try {
client.close();
} catch (IOException e) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
.append(System.currentTimeMillis());
}
}
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
.append(System.currentTimeMillis());
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
.append(System.currentTimeMillis());
}
}
}
}
public static void main(String[] args) {
String msg="";
msg="FFFF76623634010100102700170103IBSS017555 000000021800100023402287248808*766236340100200001178400003001785000030217860000302110000004075510100020SZ2000000054121442461020001241324186148310300593PM_DJDHHM||83456517||001#$PM_HYLX||0||001#$BA_MSMAN||海豚||001#$PM_DJQYYB||518000||001#$PM_DJQYMC||深圳市福田区人力资源服务中心||001#$PM_BHHM||83456517||001#$PM_DJQYDZ||福田区福强路深圳文化创意园世纪工艺品文化广场309栋B座1-3层||001#$PM_SFZDXY||XY02||001#$PM_DJKHXX||||001#$BA_MSDEPTNAME||12||001#$PM_DLS||DSL6||001#$PM_YWSLLB||SLLB01||001#$PM_SLDYSLSH||0||001#$PM_JFQ||01||001#$PM_DJHMGS||1||001#$PM_SRFJ||2||001#$PM_JFJG||1||001#$PM_YZ||30||001#$PM_DXFSSL||100||001#$PB_BILLINGTYPE||000000||005#$PB_USERTYPE||100002||005#$PB_USERCHAR||JFSX01||005#$BEGIN_DATE||20170607||005#$END_DATE||||005#$10400014DXMP214688722910700016号百信息服务中心10800010122810070411400006徐冬生115000088291816511600110114+企业名片行业版包月套餐,114+短信名片包月套餐_定价计划,114+企业名片行业版包月套餐赠送3个月套餐外等额话费优惠11700017755KH000293285120\t";
String x=SendClient.send("127.0.0.1", 10001, msg, 3500);
System.out.println("return string:"+x);
}
}
处理类:
package server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.eshore.ismp.hbinterface.service.BizCommonService;
public class SpsServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger logger = LoggerFactory.getLogger(SpsServerHandler.class);
private BizCommonService bizCommonService;
public SpsServerHandler(){}
public SpsServerHandler(BizCommonService bizCommonService){
this.bizCommonService=bizCommonService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("SERVER接收到消息 msg:{}",msg);
long start = System.currentTimeMillis();
boolean result = bizCommonService.sendOperToCacheAysn(String.valueOf(msg));
/**
* step 3 : 创建响应报文
*/
String res = bizCommonService.createResponseStr(String.valueOf(msg),result);
long end = System.currentTimeMillis();
logger.debug("SpsServer request:{} res:{} time cost:{}ms",String.valueOf(msg),res,(end-start));
ctx.channel().writeAndFlush(res);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
logger.warn("Unexpected exception from downstream.", cause);
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("client closed:"+ctx.channel().hashCode());
super.channelInactive(ctx);
}
}
输出:
length:1027
服务端增加了:
ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));
客户端报文增加了
\t