学习netty之后,可能都有一个疑问,就是如何选择一个编码、解码器,在netty中的编解码可是和json这种编解码是不一样的,netty的编解码器主要是解决TCP粘包、拆包的问题。netty中有许多自带的编解码器,我推荐使用websocket编解码器。

  选用websocket的好处就是它是一个持久化链接,能给让客户端与服务端保持长时间连接;其次websocket中支持许多数据格式,如果文本、二进制、ping信息等等;最后websocket本身也是基于HTTP协议的,所以websocket也能够支持文件的上传,所以选用websocket编解码,是个不错的选择。

一、服务端模块

   1.引入maven与启动类

    本文是基于spring boot 2.0,netty4.0开发的,这里只展示netty的包

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

    因为是服务端,所以需要在spring boot自带的启动类中同时启动netty服务

/**
 * 声明CommandLineRunner接口,实现run方法,就能给启动项目同时启动netty服务
 */
@SpringBootApplication
public class ThemApplication implements CommandLineRunner {

    /**
     * netty服务
     */
    @Autowired
    ServerByNetty serverByNetty;

    public static void main(String[] args) {
        SpringApplication.run(ThemApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        serverByNetty.startServer();
    }
}

  2.netty服务端类

/**
 * 基于websocket的服务端代码
 */
@Configuration
public class ServerByNetty {

    /**
     * 服务端启动类
     * @throws Exception
     */
    public void startServer() throws Exception {
        // netty基本操作,两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wokerGroup = new NioEventLoopGroup();
        try{
            //netty的启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class)
                    //记录日常的handler,netty自带的
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .option(ChannelOption.SO_BACKLOG,1024*1024*10)
                    //设置handler
                    .childHandler(new ChannelInitializer< SocketChannel >(){
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //websocket协议本身是基于Http协议的,所以需要Http解码器
                            pipeline.addLast("http-codec",new HttpServerCodec());
                            //以块的方式来写的处理器
                            pipeline.addLast("http-chunked",new ChunkedWriteHandler());
                            //netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
                            pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024));

                            //这个是websocket的handler,是netty提供的,也可以自定义,建议就用默认的
                            pipeline.addLast(new WebSocketServerProtocolHandler("/hello",null,true,65535));
                            //自定义的handler,处理服务端传来的消息
                            pipeline.addLast(new WebSocketHandle());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }

    }
}

  在服务端类中需要注意的是使用的handler,一共有5个,前面三个都是HTTP的编解码器,WebSocketServerProtocolHandler则是websocket的handler,这个的作用主要是用来解决HTTP握手等问题。虽然可以自己实现,但是推荐采用这个默认的handler,它能够解决很多未知的问题。

  3.自定义的handler, WebSocketHandle类

/**
 * 自定义的handler类
 */
@Configuration
public class WebSocketHandle extends SimpleChannelInboundHandler<Object> {

    /**
     * Handler活跃状态,表示连接成功
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端连接成功");
    }

    /**
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //文本消息
        if (msg instanceof TextWebSocketFrame) {
            System.out.println("服务端收到的消息:" + ((TextWebSocketFrame) msg).text());
            //给客户端发送的消息
            String putMessage="hello 客户端";
            ctx.channel().writeAndFlush(new TextWebSocketFrame(putMessage));
            System.out.println("给客户端发送的消息:"+putMessage);
        }
        //二进制消息
        if (msg instanceof BinaryWebSocketFrame) {
            System.out.println("收到二进制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes());
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes()));
            //给客户端发送的消息
            ctx.channel().writeAndFlush(binaryWebSocketFrame);
        }
        //ping消息
        if (msg instanceof PongWebSocketFrame) {
            System.out.println("客户端ping成功");
        }
        //关闭消息
        if (msg instanceof CloseWebSocketFrame) {
            System.out.println("客户端关闭,通道关闭");
            Channel channel = ctx.channel();
            channel.close();
        }
    }

    /**
     * 未注册状态
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("等待连接");
    }

    /**
     * 非活跃状态,没有连接远程主机的时候。
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端关闭");
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("连接异常:"+cause.getMessage());
        ctx.close();
    }
}

  因为我们采用了websocket自带的handler,所以不需要我自己再去解决HTTP握手的问题,我们只需要对客户端发送过来的数据进行转换和业务处理。

  至此,服务端的代码就已经完成了。

二、客户端模块

  客户端模块中,就可以有多种实现了。可以采用JS实现网页版的聊天工具,也可以在安卓端实现客户端。本来这里使用的java后实现的一个模式。

  1.客户端类

/**
 * 基于websocket的netty客户端
 *
 */
public class ClientByNetty {

    public static void main(String[] args) throws Exception {
        //netty基本操作,线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //netty基本操作,启动类
        Bootstrap boot = new Bootstrap();
        boot.option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .group(group)
                .handler(new LoggingHandler(LogLevel.INFO))
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast("http-codec",new HttpClientCodec());
                        pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10));
                        pipeline.addLast("hookedHandler", new WebSocketClientHandler());
                    }
                });
        //websocke连接的地址,/hello是因为在服务端的websockethandler设置的
        URI websocketURI = new URI("ws://localhost:8899/hello");
        HttpHeaders httpHeaders = new DefaultHttpHeaders();
        //进行握手
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders);
       //客户端与服务端连接的通道,final修饰表示只会有一个
        final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
        WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler");
        handler.setHandshaker(handshaker);
        handshaker.handshake(channel);
        //阻塞等待是否握手成功
        handler.handshakeFuture().sync();
        System.out.println("握手成功");
        //给服务端发送的内容,如果客户端与服务端连接成功后,可以多次掉用这个方法发送消息
        sengMessage(channel);
    }

    public static void sengMessage(Channel channel){
        //发送的内容,是一个文本格式的内容
        String putMessage="你好,我是客户端";
        TextWebSocketFrame frame = new TextWebSocketFrame(putMessage);
        channel.writeAndFlush(frame).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("消息发送成功,发送的消息是:"+putMessage);
                } else {
                    System.out.println("消息发送失败 " + channelFuture.cause().getMessage());
                }
            }
        });
    }

}

  客户端代码中需要注意很多地方:

  (1) 客户端的http编解码器是HttpClientCodec与服务端是不一样的,服务端是HttpServerCodec

  (2) URI中的地址用的websocket的协议 ws:,而/hello则是服务端设置的通道地址,类似于HTTP的接口地址

  (3) 当客户端与服务端连接成功后,就可以通过调用sengMessage方法给服务端发送消息,只要这个连接没有断开就能够一直发

  (4) 调用发送消息的方法,一定要等待握手成功后发送

  2.客户端的handler,WebSocketClientHandler

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    //握手的状态信息
    WebSocketClientHandshaker handshaker;
    //netty自带的异步处理
    ChannelPromise handshakeFuture;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("当前握手的状态"+this.handshaker.isHandshakeComplete());
        Channel ch = ctx.channel();
        FullHttpResponse response;
        //进行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse)msg;
                //握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                //设置成功
                this.handshakeFuture.setSuccess();
                System.out.println("服务端的消息"+response.headers());
            } catch (WebSocketHandshakeException var7) {
                FullHttpResponse res = (FullHttpResponse)msg;
                String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            response = (FullHttpResponse)msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else {
            //接收服务端的消息
            WebSocketFrame frame = (WebSocketFrame)msg;
            //文本信息
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                System.out.println("客户端接收的消息是:"+textFrame.text());
            }
            //二进制信息
            if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame;
                System.out.println("BinaryWebSocketFrame");
            }
            //ping信息
            if (frame instanceof PongWebSocketFrame) {
                System.out.println("WebSocket Client received pong");
            }
            //关闭消息
            if (frame instanceof CloseWebSocketFrame) {
                System.out.println("receive close frame");
                ch.close();
            }

        }
    }

    /**
     * Handler活跃状态,表示连接成功
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与服务端连接成功");
    }

    /**
     * 非活跃状态,没有连接远程主机的时候。
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("主机关闭");
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("连接异常:"+cause.getMessage());
        ctx.close();
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.handshakeFuture = ctx.newPromise();
    }

    public WebSocketClientHandshaker getHandshaker() {
        return handshaker;
    }

    public void setHandshaker(WebSocketClientHandshaker handshaker) {
        this.handshaker = handshaker;
    }

    public ChannelPromise getHandshakeFuture() {
        return handshakeFuture;
    }

    public void setHandshakeFuture(ChannelPromise handshakeFuture) {
        this.handshakeFuture = handshakeFuture;
    }

    public ChannelFuture handshakeFuture() {
        return this.handshakeFuture;
    }
}

  在handler中,我们可以验证握手是否成功,就使用handshaker.isHandshakeComplete()的方法,如果false就表示握手失败。如果是握手失败客户端就无法接收服务端的消息,所以如果当你要验证消息是否成功到达客户端的时候,可以采用这个方法。

  运行结果,先运行服务端,再运行客户端:

  服务端界面

  客户端界面:

  

 

 

12-16 03:26