我们在使用netty的时候会使用一个参数,ChannelOption.SO_KEEPALIVE为true, 设置好了之后再Linux系统才会对keepalive生效,但是linux里边需要配置几个参数,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepalive_probes,如果不配置的时候都会是默认值。

  tcp_keepalive_time 即给一个TCP连接发送心跳包最后的时间间隔某一段时间后继续发送心跳包,允许空闲的时间,然后再次发送心跳包,默认时间为7200秒,即2个小时发一次心跳包。

      tcp_keepalive_invl,发送存活探测时候未收到对方回执的时候,需要间隔一段时间继续发送。默认为75秒。

  tcp_keepalive_probes,如果发了存活探测的时候没有收到对方的回执,那么需要继续发送探测的次数,此时默认值为9次,也就是未收到回执的时候需要发送9次。

  再理一次,间隔tcp_keepalive_time之后发送心跳探测,如果未收到对方回执的时候,需要间隔tcp_keepalive_invl设置的时间继续发送,一共需要发送tcp_keepalive_probes的次数。  

      这个是Linux系统的配置,如果要使用Linux的此功能需要设置SO_KEEPALIVE为true,同时设置其他几个参数。系统默认的SO_KEEPALIVE为false。因为这些情况的差异,所以netty提供了自己实现心跳的机制。

  netty有心跳的实现方法 IdleStateHandler,其中有读空闲时间,写空闲时间,读写空闲时间,只要有一个满足条件会触发userEventTriggered方法。

public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds)

  定义个消息内容吧,长度为Type的长度1 + 实际内容的长度5 = 6。Length为2个字节,Type为1个类型。

  +----------+----------+----------------+
  |  Length  |Type(byte)| Actual Content |
  |   0x06   |    1     |    "HELLO"     |
  +----------+----------+----------------+  

  定义公共的inbound方法,用于进行channelRead, sendPing, sendPong, userEventTriggered 方法。

package com.hqs.heartbeat.common;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author huangqingshi
 * @Date 2019-05-11
 */
public abstract class CustomeHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {

    public static final byte PING = 1;
    public static final byte PONG = 2;
    public static final byte CUSTOM_MSG = 3;

    protected String name;
    private AtomicInteger heartbeatCount = new AtomicInteger(0);

    public CustomeHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        if(byteBuf.getByte(2) == PING) {
            sendPong(channelHandlerContext);
        } else if(byteBuf.getByte(2) == PONG) {
            System.out.println("get pong msg from " + channelHandlerContext
                    .channel().remoteAddress());
        } else {
            handleData(channelHandlerContext, byteBuf);
        }
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext,
                                       ByteBuf byteBuf);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channel read : " + msg);
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(byteBuf.getByte(2));
        super.channelRead(ctx, msg);
    }

    protected void sendPong(ChannelHandlerContext channelHandlerContext) {
        ByteBuf buf = channelHandlerContext.alloc().buffer(3);
        buf.writeShort(3);
        buf.writeByte(PONG);
        channelHandlerContext.writeAndFlush(buf);
        heartbeatCount.incrementAndGet();
        System.out.println("send pong message to " + channelHandlerContext.channel().remoteAddress());
    }

    protected void sendPing(ChannelHandlerContext channelHandlerContext) {
        ByteBuf buf = channelHandlerContext.alloc().buffer(3);
        buf.writeShort(3);
        buf.writeByte(PING);
        channelHandlerContext.writeAndFlush(buf);
        heartbeatCount.incrementAndGet();
        System.out.println("send ping message to " + channelHandlerContext.channel().remoteAddress());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case ALL_IDLE:
                    handlALLIdle(ctx);
                    break;
                case READER_IDLE:
                    handlReadIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handlWriteIdle(ctx);
                    break;
                 default:
                     break;
            }
        }
    }

    protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
        System.out.println("READ_IDLE---");
    }

    protected void handlWriteIdle(ChannelHandlerContext channelHandlerContext) {
        System.out.println("WRITE_IDLE---");
    }

    protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
        System.out.println("ALL_IDLE---");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel:" + ctx.channel().remoteAddress() + " is active");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel:" + ctx.channel().remoteAddress() + " is inactive");
    }
}

  定义Server的方法,设置读超时为10秒,采用固定长度方法进行内容分割:LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0),长度为1K 。一个主线程接收请求,四个线程处理请求。端口号设置为9999。

package com.hqs.heartbeat.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * @author huangqingshi
 * @Date 2019-05-11
 */
public class Server {

    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup(4);

        try {
            ServerBootstrap bootstrapServer = new ServerBootstrap();
            bootstrapServer.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline channelPipeline = ch.pipeline();
                    channelPipeline.addLast(new IdleStateHandler(10, 0, 0));
                    channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0,2, -2, 0));
                    channelPipeline.addLast(new ServerHandler());
                }
            });
            Channel channel = bootstrapServer.bind(9999).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

  Server的handler的处理方法:

package com.hqs.heartbeat.server;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author huangqingshi
 * @Date 2019-05-11
 */
public class ServerHandler extends CustomeHeartbeatHandler {

    public ServerHandler() {
        super("server");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext,
                              ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 3];
        ByteBuf responseBuf = Unpooled.copiedBuffer(byteBuf);
        byteBuf.skipBytes(3);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content : " + content);
        channelHandlerContext.writeAndFlush(responseBuf);
    }

    @Override
    protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
        super.handlReadIdle(channelHandlerContext);
        System.out.println(" client " + channelHandlerContext.channel().remoteAddress() + " reader timeout close it --");
        channelHandlerContext.close();
    }
}

  定义Client类,所有超时时间为5秒,如果5秒没有读写的话则发送ping,如果失去连接之后inactive了就会重新连接,采用10秒出发一次。

package com.hqs.heartbeat.client;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author huangqingshi
 * @Date 2019-05-11
 */
public class Client {


    private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
    private Channel channel;
    private Bootstrap bootstrap;

    public static void main(String[] args) throws InterruptedException {
        Client client = new Client();
        client.start();
        client.sendData();
    }

    public void start() {

        try {
            bootstrap = new Bootstrap();

            bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline channelPipeline = ch.pipeline()
                                    .addLast(new IdleStateHandler(0,0,5))
                                    .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0))
                                    .addLast(new ClientHandler(Client.this));
                        }
                    });
            doConnect();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void sendData() throws InterruptedException {
        Random random = new Random(System.currentTimeMillis());
        for(int i = 0; i < 10000; i++) {
            if(channel != null && channel.isActive()) {
                String content = "client msg " + i;
                ByteBuf byteBuf = channel.alloc().buffer(3 + content.getBytes().length);
                byteBuf.writeShort(3 + content.getBytes().length);
                byteBuf.writeByte(CustomeHeartbeatHandler.CUSTOM_MSG);
                byteBuf.writeBytes(content.getBytes());
                channel.writeAndFlush(byteBuf);
            }

            Thread.sleep(random.nextInt(20000));

        }

    }

    public void doConnect() {
        if(channel != null && channel.isActive()) {
            return;
        }

        ChannelFuture future = bootstrap
                .connect("127.0.0.1", 9999);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(future.isSuccess()) {
                    channel = future.channel();
                    System.out.println("connect to server successfully");
                } else {
                    System.out.println("Failed to connect to server, try after 10s");

                    future.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConnect();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });
    }

}

  定义clientHandler方法,读取时跳过长度+类型 2+1 三个字节,然后获取消息。连接断开之后则进行重连。

package com.hqs.heartbeat.client;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

/**
 * @author huangqingshi
 * @Date 2019-05-11
 */
public class ClientHandler extends CustomeHeartbeatHandler {

    private Client client;

    public ClientHandler(Client client) {
        super("client");
        this.client = client;
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 3];
        byteBuf.skipBytes(3);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content:" + content);
    }

    @Override
    protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
        super.handlALLIdle(channelHandlerContext);
        sendPing(channelHandlerContext);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        client.doConnect();
    }
}

  好了,总体的netty心跳实现机制就这么多,希望能帮助到大家。

  github地址:https://github.com/stonehqs/heartbeat

  

05-19 18:26