上一篇博客《基于ZooKeeper与zkclient的统一配置管理实现(一)》分享了基于ZooKeeper原生api实现的统一配置管理,本篇文章将通过使用zkclient封装后的api来再次实现该功能。

实现的效果与上一篇文章类似,这里不再赘述。

系统的结构

系统仍然是由四个组件组成:

  • ZooKeeperServer

        集群或单机版的ZooKeeper服务端,主要用以存储IConfigPublisher发布的配置文件信息

  • IConfigPublisher

        配置文件的发布器,负责将配置文件信息发布到ZooKeeperServer中去

  • IConfigSubscriber

        配置文件变更情况的订阅器,由客户端开启对服务器配置文件信息的订阅,当配置信息发生变更时,负责将本地的信息更新成最新的状态

  • ZkConfigChanger

        配置文件更改器,一般由用户手动调用,用来更改配置文件的信息

启动ZooKeeper集群的方法同上一篇博客,这里不再赘述,各位可以自行异步《基于ZooKeeper与zkclient的统一配置管理实现(一)》查看。

其中IConfigPublisher和IConfigSubscriber是接口,看一下两个接口的定义:

/**
 * Config files publisher
 * @author hwang
 *
 */
public interface IConfigPublisher {

	/**
	 * publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost}
	 * @param zkServerHost
	 * @param configRootNode
	 * @param configDir
	 */
	public void publish(String zkServerHost,String configRootNode,String configDir);
}

IConfigPublisher接口只有一个publish方法,主要的工作就是把configDir目录下的配置文件发布到zkServerHost的configRootNode节点中去。

/**
 * Subscribe Config files change
 * @author hwang
 *
 */
public interface IConfigSubscriber {

	/**
	 * <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p>
	 * <p>include the dataChange and childrenChange </p>
	 * @param zkServerHost
	 * @param configRootNode
	 */
	public void subscribe(String zkServerHost,String configRootNode);
}

IConfigSubscriber接口也只有一个方法,主要的工作就是订阅ZkServerHost中的configRootNode节点的变化情况。

现在来看下这两个主要的核心接口的实现情况。

配置文件发布器IConfigPublisher

IConfigPublisher接口的实现类是ZkConfigPublisher,看一下ZkConfigPublisher是怎么实现publish方法的:

public class ZkConfigPublisher implements IConfigPublisher{

	private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);

	private ZkClient client;

	private String configRootNode;

	@Override
	public void publish(String zkServerHost,String configRootNode,String configDir){
		try{
			if(client==null){
				client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
				client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
			}
			this.configRootNode = configRootNode;
			String rootNode = "/" + configRootNode;
			// 创建根节点
			ZkClientNodeUtil.createNode(client, rootNode, configDir);
			// 扫描所有配置文件
			this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX);
		}catch(Exception e){
			logger.error("",e);
		}
	}

	/**
     * 扫描指定目录下的所有配置文件,并将内容写入到zookeeper节点中
     * @param path	扫描的目录
     * @param acceptSuffix 接受的文件后缀
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{
    	File dir = new File(path);
        if(dir.exists() && dir.isDirectory()){
        	File[] subFiles = dir.listFiles();
        	for(File file : subFiles){
        		String absPath = file.getAbsolutePath();
        		String fileName = file.getName();
        		if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){
        			this.scanConfigFiles(absPath,acceptSuffix);
        		}else{
        			String parentDir = file.getParentFile().getAbsolutePath();
        			// 读取文件内容
        			String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET);
        			// 创建目录节点
        			ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir);
        			// 创建该目录下的文件节点
        			ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent);
        		}
        	}
        }
    }

}

实现方法很简单,先创建了一个ZkClient对象,然后创建了一个根节点,最后扫描了指定目录下的所有配置文件,并将符合要求的配置文件(及目录)加入到ZooKeeperServer中去。

配置文件订阅器IConfigSubscriber

IConfigSubscriber接口的实现类是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎么实现subscribe方法的:

public class ZkConfigSubscriber implements IConfigSubscriber{

	private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class);


	private ZkClient client;

	@Override
	public void subscribe(String zkServerHost, String configRootNode) {
		try{
			if(client==null){
				client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
				client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
			}
			String rootNode = "/" + configRootNode;
			this.clearConfigDir(client,rootNode);
			this.subscribeRootNode(client, rootNode);
			// 等待配置信息变更
			Thread.currentThread().join();
		}catch(Exception e){
			logger.error("",e);
		}
	}



	/**
     * 清空本地的配置文件目录
     * @param client
     * @param rootNode
     * @throws IOException
     * @throws InterruptedException
     * @throws KeeperException
     */
	private void clearConfigDir(ZkClient client,String rootNode) throws IOException{
    	if(client.exists(rootNode)){
    		String configDir = client.readData(rootNode);
    		FileUtils.deleteDirectory(new File(configDir));
    		logger.info("Delete config dir:"+configDir);
    	}
    }


    /**
     * 订阅根节点和递归订阅所有子节点
     * @param client
     * @param rootNodePath
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
	private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{
        if(client.exists(rootNodePath)){
        	logger.debug("subscribe node:"+rootNodePath);
        	ZkConfigSubscriber.subscribePath(client, rootNodePath);
        	List<String> subList = client.getChildren(rootNodePath);
        	if(null!=subList && subList.size()>0){
        		// 将节点的所有子节点保存起来
        		NodeChildrenChangedWrapper.addChildren(rootNodePath, subList);
        	}
        	for (String subNode : subList) {
        		this.subscribeSubNode(client,rootNodePath,subNode);
        	}
        }else{
        	logger.warn("rootNode:"+rootNodePath+" does not exists!");
        }
    }

    /**
     * 订阅子节点
     * @param client
     * @param currentNode
     * @param subNode
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
	private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 订阅子节点
    		if(client.exists(nodePath)){
    			// sync content to client
            	String content = client.readData(nodePath);
            	OnetimeConfigSyncer.syncToClient(content);

    			logger.debug("subscribe node:"+nodePath);
    			ZkConfigSubscriber.subscribePath(client, nodePath);

            	List<String> subList = client.getChildren(nodePath);
            	if(null!=subList && subList.size()>0){
            		// 将节点的所有子节点保存起来
            		NodeChildrenChangedWrapper.addChildren(nodePath, subList);
            	}
            	for (String _subNode : subList) {
            		this.subscribeSubNode(client,nodePath,_subNode);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }

}

subscribe方法的具体实现也很简单,先是清空本地的配置文件目录,然后订阅根节点和递归订阅所有子节点。订阅的时候,会将每个节点和该节点的子节点的情况保存到Map中去,具体的原因在上一篇博客中已经做过说明,这个不再赘述。具体执行订阅的方法是ZkConfigSubscriber类中的subscribePath()方法,来看下该方法的内容:

    /**
	 * Store the paths that already subscribed
	 */
	private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>();

	public static void subscribePath(ZkClient client,String path){
		if(!subscribedPathSet.contains(path)){
			subscribedPathSet.add(path);
			// Subscribe ChildChange and DataChange event at path
        	client.subscribeChildChanges(path, new ChildrenChangeListener(client));
        	client.subscribeDataChanges(path, new DataChangeListener(client));
        	logger.info("Subscribe ChildChange and DataChange event at path:"+path);
		}
	}

	public static void unsubscribePath(ZkClient client,String path){
		if(subscribedPathSet.contains(path)){
			subscribedPathSet.remove(path);
			// Unsubscribe ChildChange and DataChange event at path
			client.unsubscribeChildChanges(path, new ChildrenChangeListener(client));
			client.unsubscribeDataChanges(path, new DataChangeListener(client));
			logger.info("Unsubscribe ChildChange and DataChange event at path:"+path);
		}
	}
	

主要是用一个CopyOnWriteArraySet存储所有已经订阅的节点的path,防止重复订阅。

其中订阅时使用了两个Listener类,分别是ChildrenChangeListener和DataChangeListener。

ChildrenChangeListener

先来看下ChildrenChangeListener的实现:

    /**
	 * ChildrenChangeListener
	 * @author hwang
	 *
	 */
	public static class ChildrenChangeListener implements IZkChildListener{

		private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class);

		private ZkClient client;

		public ChildrenChangeListener(ZkClient client){
			this.client = client;
		}

		@Override
		public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
			if(currentChilds==null || currentChilds.isEmpty()){
				logger.warn("No currentChilds get form parentPath:"+parentPath);
				return;
			}
			ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds);
	    	ChildrenChangeType changeType = changeResult.getChangeType();
	    	List<String> changePath = changeResult.getChangePath();
	    	if(changePath==null || changePath.isEmpty()){
	    		logger.warn("No children changePath get form parentPath:"+parentPath);
	    		return;
	    	}
	    	switch(changeType){
	        	case add:{
	        		for(String subPath : changePath){
	        			logger.info("Add children node,path:"+parentPath+"/"+subPath);
	        			String path = parentPath+"/"+subPath;
	        			RealtimeConfigSyncer.syncToClient(client,path);
	        		}
	        	}break;
	        	case delete:{
	        		for(String subPath : changePath){
	        			ZkConfigSubscriber.unsubscribePath(client, subPath);
	        			String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/");
	    	        	FileUtils.deleteQuietly(new File(filePath));
	    	        	logger.info("Delete children node,file:"+filePath);
	        		}
	        	}break;
	        	case update:{
	        		logger.info("Update children node,will do nothing");
	        	}break;
	        	default:{
	        		logger.info("Default children node operate,will do nothing");
	        	}break;
	    	}
		}
	}
	

ZkClient会在NodeChildChanged事件发生时主动触发IZkChildListener接口的handleChildChange方法。所以我们只需要实现IZkChildListener接口的handleChildChange方法即可,并且同一个path只需要订阅一次,zkclient会自动为我们对path进行续订。

DataChangeListener

同样的还有IZkDataListener接口,我们只需要实现IZkDataListener接口的handleDataChange和handleDataDeleted方法即可,下面就是该接口的具体实现情况:

    /**
	 * DataChangeListener
	 * @author hwang
	 */
	public static class DataChangeListener implements IZkDataListener{

		private static final Log logger = LogFactory.getLog(DataChangeListener.class);

		private ZkClient client;

		public DataChangeListener(ZkClient client){
			this.client = client;
		}
		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			logger.info("handleDataChange event,dataPath:"+dataPath);
			RealtimeConfigSyncer.syncToClient(client,dataPath);
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			logger.info("handleDataDeleted event,dataPath:"+dataPath);
			ZkConfigSubscriber.unsubscribePath(client, dataPath);
			String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/");
	    	FileUtils.deleteQuietly(new File(filePath));
		}
	}
	

需要注意的是,当出现新增或修改事件时,只需要将最新的配置文件的内容同步到本地即可,但是出现删除事件时,除了需要删除本地的相关配置文件,还需要将已经订阅的事件取消掉,也就是需要执行ZkConfigSubscriber.unsubscribePath()方法。

配置文件更改器ZkConfigChanger

实现完发布器和订阅器之后,最后的一个就是配置文件更改器了。更改器主要的工作就是用来修改ZooKeeperServer端的配置文件的内容,具体的实现如下:

/**
 * 服务端配置文件更改器
 * @author hwang
 *
 */
public class ZkConfigChanger {

	private static final Log logger = LogFactory.getLog(ZkConfigChanger.class);

	private static ZkClient client;

	/**
	 * 初始化zkclient
	 */
    public static void init(){
    	if(client==null){
    		try {
    			client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT);
    			client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET));
    		} catch (Exception e) {
    			logger.error("",e);
    		}
    	}
    }


    /**
     * 新增目录节点
     * @param configRootNode
     * @param dirAbsolutePath 目录的绝对路径,该目录必须是/config/开头的目录
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath);
    }

    /**
     * 删除目录节点
     * @param configRootNode
     * @param dirAbsolutePath
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.deleteDirNode(client, configRootNode,  dirAbsolutePath);
    }



    /**
     * 新增文件节点
     * @param configRootNode
     * @param fileAbsolutePath 文件的绝对路径,不包括文件名
     * @param fileName 文件名
     * @param fileContent 文件内容
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.createFileNode(client, configRootNode,  fileAbsolutePath, fileName, fileContent);
    }

    /**
     * 删除文件节点
     * @param configRootNode
     * @param fileAbsolutePath 文件的绝对路径,不包括文件名
     * @param fileName 文件名
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){
    		logger.error("fileAbsolutePath,fileName can't be empty");
    		return false;
    	}
    	return ZkClientNodeUtil.deleteFileNode(client, configRootNode,  fileAbsolutePath, fileName);
    }


    /**
     * 更新配置文件内容
     * @param configRootNode
     * @param fileAbsolutePath
     * @param fileName
     * @param fileContent
     * @throws InterruptedException
     * @throws KeeperException
     * @throws UnsupportedEncodingException
     */
    public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==client){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return false;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath)  || StringUtils.isEmpty(fileName)  || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return false;
    	}
		return ZkClientNodeUtil.updateFileNode(client, configRootNode,  fileAbsolutePath, fileName, fileContent);
    }



}

至此,通过ZkClient重构的统一配置管理框架就完成了。

经过实际测试,zkclient可以完美解决上一篇博客中未解决的问题,这得益于zkclient大量正确的使用了retryUntilConnected方法。

03-18 08:05