上一篇博客《基于ZooKeeper与zkclient的统一配置管理实现(一)》分享了基于ZooKeeper原生api实现的统一配置管理,本篇文章将通过使用zkclient封装后的api来再次实现该功能。
实现的效果与上一篇文章类似,这里不再赘述。
系统的结构
系统仍然是由四个组件组成:
-
ZooKeeperServer
集群或单机版的ZooKeeper服务端,主要用以存储IConfigPublisher发布的配置文件信息
-
IConfigPublisher
配置文件的发布器,负责将配置文件信息发布到ZooKeeperServer中去
-
IConfigSubscriber
配置文件变更情况的订阅器,由客户端开启对服务器配置文件信息的订阅,当配置信息发生变更时,负责将本地的信息更新成最新的状态
-
ZkConfigChanger
配置文件更改器,一般由用户手动调用,用来更改配置文件的信息
启动ZooKeeper集群的方法同上一篇博客,这里不再赘述,各位可以自行异步《基于ZooKeeper与zkclient的统一配置管理实现(一)》查看。
其中IConfigPublisher和IConfigSubscriber是接口,看一下两个接口的定义:
/**
* Config files publisher
* @author hwang
*
*/
public interface IConfigPublisher {
/**
* publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost}
* @param zkServerHost
* @param configRootNode
* @param configDir
*/
public void publish(String zkServerHost,String configRootNode,String configDir);
}
IConfigPublisher接口只有一个publish方法,主要的工作就是把configDir目录下的配置文件发布到zkServerHost的configRootNode节点中去。
/**
* Subscribe Config files change
* @author hwang
*
*/
public interface IConfigSubscriber {
/**
* <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p>
* <p>include the dataChange and childrenChange </p>
* @param zkServerHost
* @param configRootNode
*/
public void subscribe(String zkServerHost,String configRootNode);
}
IConfigSubscriber接口也只有一个方法,主要的工作就是订阅ZkServerHost中的configRootNode节点的变化情况。
现在来看下这两个主要的核心接口的实现情况。
配置文件发布器IConfigPublisher
IConfigPublisher接口的实现类是ZkConfigPublisher,看一下ZkConfigPublisher是怎么实现publish方法的:
public class ZkConfigPublisher implements IConfigPublisher{
private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);
private ZkClient client;
private String configRootNode;
@Override
public void publish(String zkServerHost,String configRootNode,String configDir){
try{
if(client==null){
client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
}
this.configRootNode = configRootNode;
String rootNode = "/" + configRootNode;
// 创建根节点
ZkClientNodeUtil.createNode(client, rootNode, configDir);
// 扫描所有配置文件
this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX);
}catch(Exception e){
logger.error("",e);
}
}
/**
* 扫描指定目录下的所有配置文件,并将内容写入到zookeeper节点中
* @param path 扫描的目录
* @param acceptSuffix 接受的文件后缀
* @throws KeeperException
* @throws InterruptedException
* @throws IOException
*/
private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{
File dir = new File(path);
if(dir.exists() && dir.isDirectory()){
File[] subFiles = dir.listFiles();
for(File file : subFiles){
String absPath = file.getAbsolutePath();
String fileName = file.getName();
if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){
this.scanConfigFiles(absPath,acceptSuffix);
}else{
String parentDir = file.getParentFile().getAbsolutePath();
// 读取文件内容
String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET);
// 创建目录节点
ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir);
// 创建该目录下的文件节点
ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent);
}
}
}
}
}
实现方法很简单,先创建了一个ZkClient对象,然后创建了一个根节点,最后扫描了指定目录下的所有配置文件,并将符合要求的配置文件(及目录)加入到ZooKeeperServer中去。
配置文件订阅器IConfigSubscriber
IConfigSubscriber接口的实现类是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎么实现subscribe方法的:
public class ZkConfigSubscriber implements IConfigSubscriber{
private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);
private ZkClient client;
@Override
public void subscribe(String zkServerHost, String configRootNode) {
try{
if(client==null){
client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
}
String rootNode = "/" + configRootNode;
this.clearConfigDir(client,rootNode);
this.subscribeRootNode(client, rootNode);
// 等待配置信息变更
Thread.currentThread().join();
}catch(Exception e){
logger.error("",e);
}
}
/**
* 清空本地的配置文件目录
* @param client
* @param rootNode
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
private void clearConfigDir(ZkClient client,String rootNode) throws IOException{
if(client.exists(rootNode)){
String configDir = client.readData(rootNode);
FileUtils.deleteDirectory(new File(configDir));
logger.info("Delete config dir:"+configDir);
}
}
/**
* 订阅根节点和递归订阅所有子节点
* @param client
* @param rootNodePath
* @throws KeeperException
* @throws InterruptedException
* @throws IOException
*/
private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{
if(client.exists(rootNodePath)){
logger.debug("subscribe node:"+rootNodePath);
ZkConfigSubscriber.subscribePath(client, rootNodePath);
List<String> subList = client.getChildren(rootNodePath);
if(null!=subList && subList.size()>0){
// 将节点的所有子节点保存起来
NodeChildrenChangedWrapper.addChildren(rootNodePath, subList);
}
for (String subNode : subList) {
this.subscribeSubNode(client,rootNodePath,subNode);
}
}else{
logger.warn("rootNode:"+rootNodePath+" does not exists!");
}
}
/**
* 订阅子节点
* @param client
* @param currentNode
* @param subNode
* @throws KeeperException
* @throws InterruptedException
* @throws IOException
*/
private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{
String nodePath = currentNode+"/"+subNode;
if(nodePath.startsWith("/")){
// 订阅子节点
if(client.exists(nodePath)){
// sync content to client
String content = client.readData(nodePath);
OnetimeConfigSyncer.syncToClient(content);
logger.debug("subscribe node:"+nodePath);
ZkConfigSubscriber.subscribePath(client, nodePath);
List<String> subList = client.getChildren(nodePath);
if(null!=subList && subList.size()>0){
// 将节点的所有子节点保存起来
NodeChildrenChangedWrapper.addChildren(nodePath, subList);
}
for (String _subNode : subList) {
this.subscribeSubNode(client,nodePath,_subNode);
}
}else{
logger.warn("subNode:"+nodePath+" does not exists!");
}
}
}
}
subscribe方法的具体实现也很简单,先是清空本地的配置文件目录,然后订阅根节点和递归订阅所有子节点。订阅的时候,会将每个节点和该节点的子节点的情况保存到Map中去,具体的原因在上一篇博客中已经做过说明,这个不再赘述。具体执行订阅的方法是ZkConfigSubscriber类中的subscribePath()方法,来看下该方法的内容:
/**
* Store the paths that already subscribed
*/
private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>();
public static void subscribePath(ZkClient client,String path){
if(!subscribedPathSet.contains(path)){
subscribedPathSet.add(path);
// Subscribe ChildChange and DataChange event at path
client.subscribeChildChanges(path, new ChildrenChangeListener(client));
client.subscribeDataChanges(path, new DataChangeListener(client));
logger.info("Subscribe ChildChange and DataChange event at path:"+path);
}
}
public static void unsubscribePath(ZkClient client,String path){
if(subscribedPathSet.contains(path)){
subscribedPathSet.remove(path);
// Unsubscribe ChildChange and DataChange event at path
client.unsubscribeChildChanges(path, new ChildrenChangeListener(client));
client.unsubscribeDataChanges(path, new DataChangeListener(client));
logger.info("Unsubscribe ChildChange and DataChange event at path:"+path);
}
}
主要是用一个CopyOnWriteArraySet存储所有已经订阅的节点的path,防止重复订阅。
其中订阅时使用了两个Listener类,分别是ChildrenChangeListener和DataChangeListener。
ChildrenChangeListener
先来看下ChildrenChangeListener的实现:
/**
* ChildrenChangeListener
* @author hwang
*
*/
public static class ChildrenChangeListener implements IZkChildListener{
private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class);
private ZkClient client;
public ChildrenChangeListener(ZkClient client){
this.client = client;
}
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
if(currentChilds==null || currentChilds.isEmpty()){
logger.warn("No currentChilds get form parentPath:"+parentPath);
return;
}
ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds);
ChildrenChangeType changeType = changeResult.getChangeType();
List<String> changePath = changeResult.getChangePath();
if(changePath==null || changePath.isEmpty()){
logger.warn("No children changePath get form parentPath:"+parentPath);
return;
}
switch(changeType){
case add:{
for(String subPath : changePath){
logger.info("Add children node,path:"+parentPath+"/"+subPath);
String path = parentPath+"/"+subPath;
RealtimeConfigSyncer.syncToClient(client,path);
}
}break;
case delete:{
for(String subPath : changePath){
ZkConfigSubscriber.unsubscribePath(client, subPath);
String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/");
FileUtils.deleteQuietly(new File(filePath));
logger.info("Delete children node,file:"+filePath);
}
}break;
case update:{
logger.info("Update children node,will do nothing");
}break;
default:{
logger.info("Default children node operate,will do nothing");
}break;
}
}
}
ZkClient会在NodeChildChanged事件发生时主动触发IZkChildListener接口的handleChildChange方法。所以我们只需要实现IZkChildListener接口的handleChildChange方法即可,并且同一个path只需要订阅一次,zkclient会自动为我们对path进行续订。
DataChangeListener
同样的还有IZkDataListener接口,我们只需要实现IZkDataListener接口的handleDataChange和handleDataDeleted方法即可,下面就是该接口的具体实现情况:
/**
* DataChangeListener
* @author hwang
*/
public static class DataChangeListener implements IZkDataListener{
private static final Log logger = LogFactory.getLog(DataChangeListener.class);
private ZkClient client;
public DataChangeListener(ZkClient client){
this.client = client;
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
logger.info("handleDataChange event,dataPath:"+dataPath);
RealtimeConfigSyncer.syncToClient(client,dataPath);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
logger.info("handleDataDeleted event,dataPath:"+dataPath);
ZkConfigSubscriber.unsubscribePath(client, dataPath);
String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/");
FileUtils.deleteQuietly(new File(filePath));
}
}
需要注意的是,当出现新增或修改事件时,只需要将最新的配置文件的内容同步到本地即可,但是出现删除事件时,除了需要删除本地的相关配置文件,还需要将已经订阅的事件取消掉,也就是需要执行ZkConfigSubscriber.unsubscribePath()方法。
配置文件更改器ZkConfigChanger
实现完发布器和订阅器之后,最后的一个就是配置文件更改器了。更改器主要的工作就是用来修改ZooKeeperServer端的配置文件的内容,具体的实现如下:
/**
* 服务端配置文件更改器
* @author hwang
*
*/
public class ZkConfigChanger {
private static final Log logger = LogFactory.getLog(ZkConfigChanger.class);
private static ZkClient client;
/**
* 初始化zkclient
*/
public static void init(){
if(client==null){
try {
client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
} catch (Exception e) {
logger.error("",e);
}
}
}
/**
* 新增目录节点
* @param configRootNode
* @param dirAbsolutePath 目录的绝对路径,该目录必须是/config/开头的目录
* @throws KeeperException
* @throws InterruptedException
* @throws UnsupportedEncodingException
*/
public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
if(null==client){
logger.warn("Not connected to ZooKeeper,will return");
return false;
}
if(StringUtils.isEmpty(dirAbsolutePath)){
logger.error("dirAbsolutePath can't be empty");
return false;
}
return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath);
}
/**
* 删除目录节点
* @param configRootNode
* @param dirAbsolutePath
* @throws InterruptedException
* @throws KeeperException
*/
public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{
if(null==client){
logger.warn("Not connected to ZooKeeper,will return");
return false;
}
if(StringUtils.isEmpty(dirAbsolutePath)){
logger.error("dirAbsolutePath can't be empty");
return false;
}
return ZkClientNodeUtil.deleteDirNode(client, configRootNode, dirAbsolutePath);
}
/**
* 新增文件节点
* @param configRootNode
* @param fileAbsolutePath 文件的绝对路径,不包括文件名
* @param fileName 文件名
* @param fileContent 文件内容
* @throws KeeperException
* @throws InterruptedException
* @throws UnsupportedEncodingException
*/
public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{
if(null==client){
logger.warn("Not connected to ZooKeeper,will return");
return false;
}
if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
return false;
}
return ZkClientNodeUtil.createFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent);
}
/**
* 删除文件节点
* @param configRootNode
* @param fileAbsolutePath 文件的绝对路径,不包括文件名
* @param fileName 文件名
* @throws InterruptedException
* @throws KeeperException
*/
public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{
if(null==client){
logger.warn("Not connected to ZooKeeper,will return");
return false;
}
if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){
logger.error("fileAbsolutePath,fileName can't be empty");
return false;
}
return ZkClientNodeUtil.deleteFileNode(client, configRootNode, fileAbsolutePath, fileName);
}
/**
* 更新配置文件内容
* @param configRootNode
* @param fileAbsolutePath
* @param fileName
* @param fileContent
* @throws InterruptedException
* @throws KeeperException
* @throws UnsupportedEncodingException
*/
public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{
if(null==client){
logger.warn("Not connected to ZooKeeper,will return");
return false;
}
if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
return false;
}
return ZkClientNodeUtil.updateFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent);
}
}
至此,通过ZkClient重构的统一配置管理框架就完成了。
经过实际测试,zkclient可以完美解决上一篇博客中未解决的问题,这得益于zkclient大量正确的使用了retryUntilConnected方法。