实战Netty集群

扫码查看

疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -25【 博客园 总入口


1.写在前面

1.1 实战Netty集群的理由

Java基础练习中,一个重要的实战练习是: java的聊天程序。基本上,每一个java工程师,都有写过自己的聊天程序。

实现一个Java的分布式的聊天程序的分布式练习,同样非常重要的是。有以下几个方面的最重要作用:

1 体验高并发的程序的开发:从研究承载千、万QPS级的流量,拓展能够承载百万级、千万级、亿万级流量

2 有分布式、高并发的实战经验,面试谈薪水的时候,能提升不少

3 Netty集群的分布式原理,和大数据的分布式原理,elasticsearch 的分布式原理,和redis集群的分布式原理,和mongodb的分布式原理,很大程度上,都是想通。 Netty集群作为一个实战开发, 是一个非常好的分布式基础练习

4 更多的理由,请参考机械工业出版社的书籍 《Netty Zookeeper Redis 高并发实战》

1.2 Netty 集群 实战源码

2 Netty 集群中,服务节点的注册和发现

2.1 服务节点的注册和发现

zookeeper作为注册中心,每一个netty服务启动的时候,把节点的信息比如ip地址+端口号注册到zookeeper上。

具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

2.2 节点的POJO

package com.crazymakercircle.entity;

import lombok.Data;

import java.io.Serializable;
import java.util.Objects; /**
* IM节点的POJO类
* create by 尼恩 @ 疯狂创客圈
**/
@Data
public class ImNode implements Comparable<ImNode>, Serializable { private static final long serialVersionUID = -499010884211304846L; //worker 的Id,zookeeper负责生成
private long id; //Netty 服务 的连接数
private Integer balance = 0; //Netty 服务 IP
private String host; //Netty 服务 端口
private Integer port; public ImNode() {
} public ImNode(String host, Integer port) {
this.host = host;
this.port = port;
} @Override
public String toString() {
return "ImNode{" +
"id='" + id + '\'' +
"host='" + host + '\'' +
", port='" + port + '\'' +
",balance=" + balance +
'}';
} @Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ImNode node = (ImNode) o;
// return id == node.id &&
return Objects.equals(host, node.host) &&
Objects.equals(port, node.port);
} @Override
public int hashCode() {
return Objects.hash(id, host, port);
} /**
* 升序排列
*/
public int compareTo(ImNode o) {
int weight1 = this.balance;
int weight2 = o.balance;
if (weight1 > weight2) {
return 1;
} else if (weight1 < weight2) {
return -1;
}
return 0;
} public void incrementBalance() {
balance++;
} public void decrementBalance() {
balance--;
}
}

2.3 服务的发现

利用zk有一个监听机制,就是针对某个节点进行监听,一点这个节点发生了变化就会收到zk的通知。我们就是利用zk的这个watch来进行服务的上线和下线的通知,也就是我们的服务发现功能。

package com.crazymakercircle.imServer.distributed;

import com.crazymakercircle.constants.ServerConstants;
import com.crazymakercircle.entity.ImNode;
import com.crazymakercircle.im.common.bean.msg.ProtoMsg;
import com.crazymakercircle.imServer.protoBuilder.NotificationMsgBuilder;
import com.crazymakercircle.util.JsonUtil;
import com.crazymakercircle.util.ObjectUtil;
import com.crazymakercircle.zk.ZKclient;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.util.concurrent.ConcurrentHashMap; /**
* create by 尼恩 @ 疯狂创客圈
**/
@Slf4j
public class PeerManager {
//Zk客户端
private CuratorFramework client = null; private String pathRegistered = null;
private ImNode node = null; private static PeerManager singleInstance = null;
private static final String path = ServerConstants.MANAGE_PATH; private ConcurrentHashMap<Long, PeerSender> peerMap =
new ConcurrentHashMap<>(); public static PeerManager getInst() {
if (null == singleInstance) {
singleInstance = new PeerManager();
singleInstance.client = ZKclient.instance.getClient();
}
return singleInstance;
} private PeerManager() { } /**
* 初始化节点管理
*/
public void init() {
try { //订阅节点的增加和删除事件 PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
log.info("开始监听其他的ImWorker子节点:-----");
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
log.info("CHILD_ADDED : " + data.getPath() + " 数据:" + data.getData());
processNodeAdded(data);
break;
case CHILD_REMOVED:
log.info("CHILD_REMOVED : " + data.getPath() + " 数据:" + data.getData());
processNodeRemoved(data);
break;
case CHILD_UPDATED:
log.info("CHILD_UPDATED : " + data.getPath() + " 数据:" + new String(data.getData()));
break;
default:
log.debug("[PathChildrenCache]节点数据为空, path={}", data == null ? "null" : data.getPath());
break;
} } }; childrenCache.getListenable().addListener(childrenCacheListener);
System.out.println("Register zk watcher successfully!");
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) {
e.printStackTrace();
}
} private void processNodeRemoved(ChildData data) { byte[] payload = data.getData();
ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class); long id = ImWorker.getInst().getIdByPath(data.getPath());
n.setId(id);
log.info("[TreeCache]节点删除, path={}, data={}",
data.getPath(), JsonUtil.pojoToJson(n));
PeerSender peerSender = peerMap.get(n.getId()); if (null != peerSender) {
peerSender.stopConnecting();
peerMap.remove(n.getId());
}
} private void processNodeAdded(ChildData data) {
byte[] payload = data.getData();
ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class); long id = ImWorker.getInst().getIdByPath(data.getPath());
n.setId(id); log.info("[TreeCache]节点更新端口, path={}, data={}",
data.getPath(), JsonUtil.pojoToJson(n)); if (n.equals(getLocalNode())) {
log.info("[TreeCache]本地节点, path={}, data={}",
data.getPath(), JsonUtil.pojoToJson(n));
return;
}
PeerSender peerSender = peerMap.get(n.getId());
if (null != peerSender && peerSender.getNode().equals(n)) { log.info("[TreeCache]节点重复增加, path={}, data={}",
data.getPath(), JsonUtil.pojoToJson(n));
return;
}
if (null != peerSender) {
//关闭老的连接
peerSender.stopConnecting();
}
peerSender = new PeerSender(n);
peerSender.doConnect(); peerMap.put(n.getId(), peerSender);
} public PeerSender getPeerSender(long id) {
PeerSender peerSender = peerMap.get(id);
if (null != peerSender) {
return peerSender;
}
return null;
} public void sendNotification(String json) {
peerMap.keySet().stream().forEach(
key -> {
if (!key.equals(getLocalNode().getId())) {
PeerSender peerSender = peerMap.get(key);
ProtoMsg.Message pkg = NotificationMsgBuilder.buildNotification(json);
peerSender.writeAndFlush(pkg);
}
}
); } public ImNode getLocalNode() {
return ImWorker.getInst().getLocalNodeInfo();
} public void remove(ImNode remoteNode) {
peerMap.remove(remoteNode.getId());
log.info("[TreeCache]移除远程节点信息, node={}", JsonUtil.pojoToJson(remoteNode));
}
}

2.4 为什么使用临时节点?

什么是临时节点?服务启动后创建临时节点, 服务断掉后临时节点就不存在了

正常的思路可能是注册的时候,我们像zk注册一个正常的节点,然后在服务下线的时候删除这个节点,但是这样的话会有一个弊端。比如我们的服务挂机,无法去删除临时节点,那么这个节点就会被我们错误的提供给了客户端。

另外我们还要考虑持久化的节点创建之后删除之类的问题,问题会更加的复杂化,所以我们使用了临时节点。

3 负载均衡策略

3.1 负载均衡策略的基本思路

在我们解决了服务的注册和发现问题之后,那么我们究竟提供给客户端那台服务呢,这时候就需要我们做出选择,为了让客户端能够均匀的连接到我们的服务器上(比如有个100个客户端,2台服务器,每台就分配50个),我们需要使用一个负载均衡的策略。

这里我们使用轮询的方式来为每个请求的客户端分配ip。具体的代码实现如下:

3.2 负载均衡实现源码的示意

package com.crazymakercircle.Balance;

import com.crazymakercircle.constants.ServerConstants;
import com.crazymakercircle.entity.ImNode;
import com.crazymakercircle.util.JsonUtil;
import com.crazymakercircle.zk.ZKclient;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.stereotype.Service; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; /**
* create by 尼恩 @ 疯狂创客圈
**/
@Data
@Slf4j
@Service
public class ImLoadBalance { //Zk客户端
private CuratorFramework client = null;
private String managerPath; public ImLoadBalance() {
this.client = ZKclient.instance.getClient();
// managerPath=ServerConstants.MANAGE_PATH+"/";
managerPath=ServerConstants.MANAGE_PATH;
} /**
* 获取负载最小的IM节点
*
* @return
*/
public ImNode getBestWorker() {
List<ImNode> workers = getWorkers(); log.info("全部节点如下:");
workers.stream().forEach(node -> {
log.info("节点信息:{}", JsonUtil.pojoToJson(node));
});
ImNode best = balance(workers); return best;
} /**
* 按照负载排序
*
* @param items 所有的节点
* @return 负载最小的IM节点
*/
protected ImNode balance(List<ImNode> items) {
if (items.size() > 0) {
// 根据balance值由小到大排序
Collections.sort(items); // 返回balance值最小的那个
ImNode node = items.get(0); log.info("最佳的节点为:{}", JsonUtil.pojoToJson(node));
return node;
} else {
return null;
}
} /**
* 从zookeeper中拿到所有IM节点
*/
protected List<ImNode> getWorkers() { List<ImNode> workers = new ArrayList<ImNode>(); List<String> children = null;
try {
children = client.getChildren().forPath(managerPath);
} catch (Exception e) {
e.printStackTrace();
return null;
} for (String child : children) {
log.info("child:", child);
byte[] payload = null;
try {
payload = client.getData().forPath(managerPath+"/"+child); } catch (Exception e) {
e.printStackTrace();
}
if (null == payload) {
continue;
}
ImNode worker = JsonUtil.jsonBytes2Object(payload, ImNode.class);
workers.add(worker);
}
return workers; }
/**
* 从zookeeper中删除所有IM节点
*/
public void removeWorkers() { try {
client.delete().deletingChildrenIfNeeded().forPath(managerPath);
} catch (Exception e) {
e.printStackTrace();
} } }

4 环境的启动

4.1 启动Zookeeper

Zookeeper的安装和原理,以及开发的基础知识,请参见书籍《Netty Zookeeper Redis 高并发实战》

实战Netty集群-LMLPHP

启动zookeeper的两个节点,本来有三个,启动二个即可

实战Netty集群-LMLPHP

客户端连接zookeeper集群。命令如下:

实战Netty集群-LMLPHP

4.2 启动Redis

Redis的安装和原理,以及开发的基础知识,请参见请参见书籍《Netty Zookeeper Redis 高并发实战》

redis 的客户端界面。

实战Netty集群-LMLPHP

5 Netty集群启动

5.1 启动WEBGate

使用一个WEBGate,作为负载均衡的服务器,具体的原理,请参见书籍《Netty Zookeeper Redis 高并发实战》

实战Netty集群-LMLPHP

除了负载均衡,从WEBGate还可以从 zookeeper中删除所有IM节点

连接为: http://localhost:8080/swagger-ui.html

swagger 的界面如下:

实战Netty集群-LMLPHP

5.2 启动第一个Netty节点

服务端的端口为7000

实战Netty集群-LMLPHP

5.3 启动第二个Netty节点

服务端的端口为7001,自动递增的

实战Netty集群-LMLPHP

5.4 启动第一个客户端

启动后输入登录的信息

启动客户端后,并且登录后,会自动连接一个netty节点, 这里为7001,第二个Netty服务节点。

实战Netty集群-LMLPHP

5.5 启动第二个客户端

启动后输入登录的信息

启动客户端后,并且登录后,按照负载均衡的机制,会自动连接一个netty节点, 这里为7000,第一个Netty服务节点。

实战Netty集群-LMLPHP

6 不同服务器直接进行IM通信

​下面演示,不同的客户端,通过各自的服务器节点,进行通信。

6.1 发送聊天消息

在第二个客户端(用户为z2),发送消息给第一个客户端(用户为z1),消息的格式为 :“ 内容@用户名”

实战Netty集群-LMLPHP

6.2 远程客户端接收消息

通过Netty服务节点的转发,第一个客户端收到的消息如下:

实战Netty集群-LMLPHP

7 总结

7.1 开发的难度

通过Netty+Zookeep+Redis的架构,整个Netty的集群,具备了服务节点的自动发现,节点之间的消息路由的能力。

说明一下,整个程序,还是比较复杂的,如果看不懂,建议不要捉急,慢慢来。

如果能从0到1的自己实现一版,开发的水平,也就不一般了。

全面的理论基础,请参见 《Netty Zookeeper Redis 高并发实战》 一书

7.2 Netty集群的最全理论基础

《Netty Zookeeper Redis 高并发实战》 一书,对Netty 集群的基本原理,进行了详尽的介绍,大致的目录如下:


疯狂创客圈 Java 死磕系列

  • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战
05-11 09:42
查看更多