HeartBeatTask 类封装了心跳定时任务,需要了解的是 provider 和 consumer 都有可能发送心跳。

final class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger( HeartBeatTask.class );
private ChannelProvider channelProvider;
private int heartbeat;
private int heartbeatTimeout; HeartBeatTask( ChannelProvider provider, int heartbeat, int heartbeatTimeout ) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
} public void run() {
try {
long now = System.currentTimeMillis();
for ( Channel channel : channelProvider.getChannels() ) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP );
Long lastWrite = ( Long ) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
if ( ( lastRead != null && now - lastRead > heartbeat )
|| ( lastWrite != null && now - lastWrite > heartbeat ) ) {
Request req = new Request();
req.setVersion( "2.0.0" );
req.setTwoWay( true );
req.setEvent( Request.HEARTBEAT_EVENT );
channel.send( req );
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
//如果是 consumer 端
if (channel instanceof Client) {
((Client)channel).reconnect();
} else { // provider 端
channel.close();
}
}
} catch ( Throwable t ) {
}
}
} catch ( Throwable t ) {
logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
}
} interface ChannelProvider {
Collection<Channel> getChannels();
} }

对于 consumer,是在 HeaderExchangeClient 类中启动心跳定时器,而 provider,则是在 HeaderExchangeServer 中启动心跳定时器。

consumer发送请求时,更新 lastWrite 值,接收响应时,更新 lastRead 值。心跳定时器定时检查 lastRead 和 lastWrite,发送心跳、重连。

public class HeaderExchangeClient implements ExchangeClient {
private static final ScheduledThreadPoolExecutor scheduled =
new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
// 心跳定时器
private ScheduledFuture<?> heatbeatTimer;
private int heartbeat;
private int heartbeatTimeout; public HeaderExchangeClient(Client client){
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
//heartbeat = 60000
this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 );
//heartbeatTimeout = 180000
this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 );
if ( heartbeatTimeout < heartbeat * 2 ) {
throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );
}
startHeatbeatTimer();
} public ResponseFuture request(Object request) throws RemotingException {
return channel.request(request);
} private void startHeatbeatTimer() {
stopHeartbeatTimer();
if ( heartbeat > 0 ) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList( HeaderExchangeClient.this );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS );
}
}
}

在 HeartbeatHandler 类中设置 lastRead 和 lastWrite 值:

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
//省略其他代码
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
} private void setWriteTimestamp(Channel channel) {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
}
}

设置 lastWrite 的调用栈:

dubbo 心跳-LMLPHP

设置 lastRead 的调用栈:

dubbo 心跳-LMLPHP

05-18 18:09