先来看一下配置管理实现的效果

服务端启动

Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.persistence.FileTxnLog append
INFO: Creating new log file: log.400000001
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.persistence.FileTxnLog append
INFO: Creating new log file: log.400000001
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.server.ZooKeeperServer finishSessionInit
INFO: Established session 0x25991c1eda50000 with negotiated timeout 5000 for client /10.1.126.104:33941
Jan 12, 2017 4:18:43 PM org.apache.zookeeper.ClientCnxn$SendThread onConnected
INFO: Session establishment complete on server 10.1.126.104/10.1.126.104:2182, sessionid = 0x25991c1eda50000, negotiated timeout = 5000
2017-01-12 16:18:43[INFO] - ServerConfigPublisher Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.server.ServerConfigPublisher.connectZkServer(ServerConfigPublisher.java:75)]
2017-01-12 16:18:43[WARN] - Node:/config alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createNode(ZkUtil.java:69)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other/Backchina_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu/BaiduTieba_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu/Baidu_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:120)]
2017-01-12 16:18:43[WARN] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml alreadly exists,can't create.
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:231)]
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.ZooKeeperServer expire
INFO: Expiring session 0x359871022840000, timeout of 5000ms exceeded
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.ZooKeeperServer expire
INFO: Expiring session 0x359871022840001, timeout of 5000ms exceeded
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.PrepRequestProcessor pRequest2Txn
INFO: Processed session termination for sessionid: 0x359871022840000
Jan 12, 2017 4:18:48 PM org.apache.zookeeper.server.PrepRequestProcessor pRequest2Txn
INFO: Processed session termination for sessionid: 0x359871022840001
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.NIOServerCnxnFactory run
INFO: Accepted socket connection from /10.1.126.102:38841
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.ZooKeeperServer processConnectRequest
INFO: Client attempting to establish new session at /10.1.126.102:38841
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.server.ZooKeeperServer finishSessionInit
INFO: Established session 0x15991c1eda50000 with negotiated timeout 5000 for client /10.1.126.102:38841

客户端启动

Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 07:39 GMT
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:host.name=test-102
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.version=1.7.0_06
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.vendor=Oracle Corporation
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.home=/usr/java/jdk1.7.0_06/jre
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.class.path=ZkConfigClientMain-1.0.jar
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.library.path=/opt/FUDE-0.4/fude/lib:/opt/FUDE-0.4/utils/lib:/opt/FUDE-0.4/log/lib:/opt/FUDE-0.4/netsnmp/lib:/opt/FUDE-0.4/omni/lib:/opt/FUDE-0.4/python/lib:/opt/FUDE-0.4/boost/lib:/opt/FUDE-0.4/cppunit/lib:/opt/FUDE-0.4/TAO/ACE_wrappers/lib::/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.io.tmpdir=/tmp
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:java.compiler=<NA>
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.name=Linux
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.arch=amd64
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:os.version=2.6.18-92.el5
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.name=root
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.home=/root
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.Environment logEnv
INFO: Client environment:user.dir=/home/NetPF/bin/zk
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ZooKeeper <init>
INFO: Initiating client connection, connectString=10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183 sessionTimeout=5000 watcher=com.zkconfig.config.client.ConfigChangeListener$ConnectedWatcher@1786b2ca
2017-01-12 16:19:00[INFO] - ZooKeeper State:CONNECTING
[com.zkconfig.config.client.ConfigChangeListener.waitUntilConnected(ConfigChangeListener.java:58)]
2017-01-12 16:19:00[INFO] - ConnectedLatch.await
[com.zkconfig.config.client.ConfigChangeListener.waitUntilConnected(ConfigChangeListener.java:61)]
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
INFO: Opening socket connection to server 10.1.126.104/10.1.126.104:2181. Will not attempt to authenticate using SASL (unknown error)
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread primeConnection
INFO: Socket connection established to 10.1.126.104/10.1.126.104:2181, initiating session
Jan 12, 2017 4:19:00 PM org.apache.zookeeper.ClientCnxn$SendThread onConnected
INFO: Session establishment complete on server 10.1.126.104/10.1.126.104:2181, sessionid = 0x15991c1eda50000, negotiated timeout = 5000
2017-01-12 16:19:00[INFO] - ConfigChangeListener Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.client.ConfigChangeListener.start(ConfigChangeListener.java:78)]
2017-01-12 16:19:01[INFO] - Delete config dir:/home/NetPF/bin/zk/zkconfig
[com.zkconfig.config.client.ClientConfigUpdater.clearConfigDir(ClientConfigUpdater.java:45)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/other
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/other/Backchina_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/baidu
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/baidu/BaiduTieba_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/baidu/Baidu_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]
2017-01-12 16:19:01[INFO] - Mkdir:/home/NetPF/bin/zk/zkconfig/tianya
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:277)]
2017-01-12 16:19:01[INFO] - Write file:/home/NetPF/bin/zk/zkconfig/tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.syncNode(ClientConfigUpdater.java:283)]

更新服务端的配置文件

一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 07:39 GMT
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:host.name=hwang-PC
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.version=1.7.0_76
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.vendor=Oracle Corporation
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.home=C:\Program Files (x86)\Java\jdk1.7.0_76\jre
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.class.path=E:\workspace\zk\bin;E:\workspace\zk\lib\log4j-1.2.11.jar;E:\workspace\zk\lib\slf4j-api-1.6.1.jar;E:\workspace\zk\lib\slf4j-jdk14-1.6.1.jar;E:\workspace\zk\lib\jline-0.9.94.jar;E:\workspace\zk\lib\netty-3.10.5.Final.jar;E:\workspace\zk\lib\junit-4.8.2.jar;E:\workspace\zk\lib\mockito-all-1.8.4.jar;E:\workspace\zk\lib\commons-lang-2.6.jar;E:\workspace\zk\lib\commons-io-2.1.jar;E:\workspace\zk\lib\commons-logging-1.0.3.jar
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.library.path=C:\Program Files (x86)\Java\jdk1.7.0_76\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;C:/Program Files (x86)/Java/jre7/bin/client;C:/Program Files (x86)/Java/jre7/bin;C:/Program Files (x86)/Java/jre7/lib/i386;C:\Program Files (x86)\Common Files\NetSarang;C:\Program Files (x86)\Intel\iCLS Client\;C:\Program Files\Intel\iCLS Client\;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\DAL;C:\Program Files (x86)\Intel\Intel(R) Management Engine Components\IPT;C:\Program Files (x86)\ATI Technologies\ATI.ACE\Core-Static;C:\Program Files\Intel\WiFi\bin\;C:\Program Files\Common Files\Intel\WirelessCommon\;C:\Program Files (x86)\Common Files\Lenovo;C:\ProgramData\Lenovo\ReadyApps;C:\Program Files (x86)\Java\jdk1.7.0_76\bin;E:\eclipse;;.
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.io.tmpdir=C:\Users\hwang\AppData\Local\Temp\
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:java.compiler=<NA>
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.name=Windows 7
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.arch=x86
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:os.version=6.1
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.name=hwang
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.home=C:\Users\hwang
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.Environment logEnv
信息: Client environment:user.dir=E:\workspace\zk
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ZooKeeper <init>
信息: Initiating client connection, connectString=10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183 sessionTimeout=5000 watcher=com.zkconfig.config.changer.ServerConfigChanger$ConnectedWatcher@149e631
2017-01-12 16:21:43[INFO] - ZooKeeper State:CONNECTING
[com.zkconfig.config.changer.ServerConfigChanger.waitUntilConnected(ServerConfigChanger.java:58)]
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
信息: Opening socket connection to server 10.1.126.104/10.1.126.104:2182. Will not attempt to authenticate using SASL (unknown error)
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection
信息: Socket connection established to 10.1.126.104/10.1.126.104:2182, initiating session
2017-01-12 16:21:43[INFO] - ConnectedLatch.await
[com.zkconfig.config.changer.ServerConfigChanger.waitUntilConnected(ServerConfigChanger.java:61)]
一月 12, 2017 4:21:43 下午 org.apache.zookeeper.ClientCnxn$SendThread onConnected
信息: Session establishment complete on server 10.1.126.104/10.1.126.104:2182, sessionid = 0x25991c1eda50001, negotiated timeout = 5000
2017-01-12 16:21:43[INFO] - ServerConfigChanger Connected to ZooKeeper Cluster:10.1.126.104:2181,10.1.126.104:2182,10.1.126.104:2183
[com.zkconfig.config.changer.ServerConfigChanger.connectZkServer(ServerConfigChanger.java:77)]
2017-01-12 16:21:44[INFO] - Create dirNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test,dirPath:/home/NetPF/bin/zk/zkconfig/test
[com.zkconfig.util.ZkUtil.createDirNode(ZkUtil.java:118)]
2017-01-12 16:21:46[INFO] - Create fileNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml,filePath:/home/NetPF/bin/zk/zkconfig/test,fileName:Test2_SearchEngineConfig.xml
[com.zkconfig.util.ZkUtil.createFileNode(ZkUtil.java:229)]
2017-01-12 16:21:46[INFO] - Update fileNode:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml,filePath:/home/NetPF/bin/zk/zkconfig/tianya,fileName:Tianya_SearchEngineConfig.xml
[com.zkconfig.util.ZkUtil.updateFileNode(ZkUtil.java:299)]

客户端自动更新配置

2017-01-12 16:21:47[INFO] - EventType:NodeChildrenChanged,Value:4,path:/config
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:47[INFO] - Catch [NodeChildrenChanged] event
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:71)]
2017-01-12 16:21:47[INFO] - Node:/config
oriChildren:_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya,
curChildren:_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_other,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_baidu,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test,_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya,
[com.zkconfig.config.client.ChildrenChangedWrapper.diff(ChildrenChangedWrapper.java:171)]
2017-01-12 16:21:47[INFO] - Add children node,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:84)]
2017-01-12 16:21:47[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:47[INFO] - nodeType:1,nodeOperateType:1
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]
2017-01-12 16:21:49[INFO] - EventType:NodeChildrenChanged,Value:4,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:49[INFO] - Catch [NodeChildrenChanged] event
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:71)]
2017-01-12 16:21:49[INFO] - Node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test has no children.All currentChildrenPath are new.
[com.zkconfig.config.client.ChildrenChangedWrapper.diff(ChildrenChangedWrapper.java:159)]
2017-01-12 16:21:49[INFO] - Add children node,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:84)]
2017-01-12 16:21:49[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_test/Test2_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:49[INFO] - nodeType:2,nodeOperateType:1
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]
2017-01-12 16:21:49[INFO] - EventType:NodeDataChanged,Value:3,path:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.updateConfigFiles(ClientConfigUpdater.java:60)]
2017-01-12 16:21:49[INFO] - Relistening node:/config/_%_home_%_NetPF_%_bin_%_zk_%_zkconfig_%_tianya/Tianya_SearchEngineConfig.xml
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:122)]
2017-01-12 16:21:49[INFO] - nodeType:2,nodeOperateType:2
[com.zkconfig.config.client.ClientConfigUpdater.nodeHandle(ClientConfigUpdater.java:130)]

系统的结构

系统主要由四个组件组成:

  • ZooKeeperServer

         集群或单机版的ZooKeeper服务端,主要用以存储ServerConfigPublisher发布的配置文件信息

  • ServerConfigPublisher

         用以将需要统一管理的配置文件发布到ZooKeeperServer中去

  • ConfigChangeListener

        用以向ZooKeeperServer进行注册监听,并在配置发生变化时实时更新客户端本地的配置文件

  • ServerConfigChanger

        用以更新ZooKeeperServer端的配置信息

以上的各组件分别打包成ZkConfigServerMain-1.0.jar和ZkConfigClientMain-1.0.jar两个jar包

ZkConfigServerMain-1.0.jar的入口是ConfigServerMain类

ZkConfigClientMain-1.0.jar的入口是ConfigClientMain类

下面让我们来看一下各组件的详细代码,通过代码来了解

ZooKeeper服务端的启动与配置文件的发布

执行ConfigServerMain类的main方法,main方法具体的执行过程如下:

public static void main(String[] args) throws Exception {
    // 启动zookeeper服务器集群
	ConfigServerMain.startZkCluster();

	// 启动配置文件发布器
    ServerConfigPublisher publisher = new ServerConfigPublisher(Constant.CONF_DIR);
    publisher.connectZkServer();
    publisher.publish();

}  

1.先启动ZooKeeper服务器的集群

2.创建一个配置文件发布器,连接上ZooKeeperServer

3.扫描服务端的配置文件,并发布到ZooKeeperServer中

需要注意的是ZooKeeper的启动分集群版(cluster)和单机版(standalone),具体的ZooKeeper服务端启动的原理,不是本篇文章的重点,各位看官可以自行探究。

启动集群的代码如下:

/**
* 启动zk server集群
* @throws IOException
* @throws InterruptedException
* @throws ConfigException
*/
public static void startZkCluster() throws IOException, InterruptedException, ConfigException{
	String zkConfigPath1 = Constant.CLUSTER_PATH +"/1/zk1.cfg";
    // 启动ZooKeeper服务器1
    ZkServer server1 = new ZkServer();
    server1.startCluster(zkConfigPath1);

    String zkConfigPath2 = Constant.CLUSTER_PATH +"/2/zk2.cfg";
    // 启动ZooKeeper服务器2
    ZkServer server2 = new ZkServer();
    server2.startCluster(zkConfigPath2);

    String zkConfigPath3 = Constant.CLUSTER_PATH +"/3/zk3.cfg";
    // 启动ZooKeeper服务器3
    ZkServer server3 = new ZkServer();
    server3.startCluster(zkConfigPath3);
}

分别创建了三个ZkServer对象,然后执行startCluster方法启动ZooKeeperServer,该方法接收一个ZooKeeperServer启动所需的配置文件的路径。

让我们看一下ZkServer的startCluster方法:

    /**
	 * 启动集群模式
	 * @param zkConfigPath
	 * @throws IOException
	 * @throws InterruptedException
	 * @throws ConfigException
	 */
	public void startCluster(String zkConfigPath) throws IOException, InterruptedException, ConfigException {
        QuorumPeerConfig config = new QuorumPeerConfig();
        // 从配置文件读取配置
        config.parse(zkConfigPath);

        ServerCnxnFactory cnxnFactory = new NIOServerCnxnFactory();
        cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());

        QuorumPeer quorumPeer = new QuorumPeer();
        quorumPeer.setClientPortAddress(config.getClientPortAddress());
        quorumPeer.setTxnFactory(new FileTxnSnapLog(
                    new File(config.getDataLogDir()),
                    new File(config.getDataDir())));
        quorumPeer.setQuorumPeers(config.getServers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

        quorumPeer.start();
        // 注释掉该行代码,如果集群在同一台服务器,启动时会一直等待
//        quorumPeer.join();
        logger.info("ZkServerCluster Started! ClientPortAddress:"+config.getClientPortAddress());
    } 

请注意该方法最后被注释掉的一行代码,由于本框架中ZooKeeper集群采用的是伪集群的方式,三台ZooKeeperServer都部署在同一台服务器上,为了server2和server3能够正常启动,故将quorumPeer.join()方法注释掉。

ZooKeeperServer集群启动完成之后,就可以进行配置文件的发布了,这里是创建一个ServerConfigPublisher的对象,该对象先连接上ZooKeeper,然后将配置文件发布到ZooKeeper中去,具体代码如下:

  • 先创建发布器对象并连接ZooKeeper
    private ZooKeeper zk;

	// 配置文件的路径
	private String configPath;

	public ServerConfigPublisher(String configPath){
		this.configPath = configPath;
	}

	private CountDownLatch connectedLatch = new CountDownLatch(1);

	/**
	 * 默认的watcher
	 *
	 */
	class ConnectedWatcher implements Watcher {
        private CountDownLatch connectedLatch;
        ConnectedWatcher(CountDownLatch connectedLatch) {
            this.connectedLatch = connectedLatch;
        }
        @Override
        public void process(WatchedEvent event) {
           if (event.getState() == KeeperState.SyncConnected) {
               connectedLatch.countDown();
           }
           logger.debug("ServerConfigPublisher get ["+event.getType()+"] notice by ConnectedWatcher");
        }
    }

	private ConnectedWatcher watcher = new ConnectedWatcher(connectedLatch);

	protected void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {
		logger.info("ZooKeeper State:"+zooKeeper.getState());
        if (States.CONNECTING == zooKeeper.getState()) {
            try {
            	logger.info("ConnectedLatch.await...");
                connectedLatch.await();
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }

    /**
     * 连接zookeeper
     */
    public void connectZkServer() throws Exception {
    	zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, watcher);
    	// 等待连接上
    	waitUntilConnected(zk, connectedLatch);
    	logger.info("ServerConfigPublisher Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
    } 
  • 发布配置文件到ZooKeeperServer
    /**
     * 读取配置文件后解析加入到znode中
     * @throws KeeperException
     * @throws IOException
     */
    public void publish() throws InterruptedException, KeeperException, IOException {
        // 创建配置文件根节点
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
    	// 创建根节点
    	ZkUtil.createNode(zk, rootNode, Constant.CONF_DIR);
    	// 扫描所有配置文件
    	scanConfigFiles(configPath,Constant.ACCEPT_SUFFIX);
    }

    /**
     * 扫描指定目录下的所有配置文件,并将内容写入到zookeeper节点中
     * @param path	扫描的目录
     * @param acceptSuffix 接受的文件后缀
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    protected void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{
    	File dir = new File(path);
        if(dir.exists() && dir.isDirectory()){
        	File[] subFiles = dir.listFiles();
        	for(File file : subFiles){
        		String absPath = file.getAbsolutePath();
        		String fileName = file.getName();
        		if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){
        			scanConfigFiles(absPath,acceptSuffix);
        		}else{
        			String parentDir = file.getParentFile().getAbsolutePath();
        			// 读取文件内容
        			String fileContent = FileUtils.readFileToString(file,Constant.CONF_CHAR_SET);
        			// 创建目录节点
        			ZkUtil.createDirNode(zk, parentDir);
        			// 创建该目录下的文件节点
        			ZkUtil.createFileNode(zk, parentDir, fileName, fileContent);
        		}
        	}
        }
    }
    

可以看到发布器会扫描指定的配置文件的目录下面的所有目录和文件,并有一个接受的文件后缀,只接受指定后缀的配置文件进行发布。

发布的节点有两种类型:目录节点和文件节点,具体的节点存储规则如下:


	 * 	<p>		节点存储规则	</p>
     * 	<p>		_#_ : separator	d : directory	f : file	                                                                            </p>
     * 	<p>		节点路径			    节点类型			     节点值	                                                                        </p>
     * 	<p>		/root				root(0)				(0_#_root).getBytes()		                                                    </p>
     * 	<p>		/root/d1			dir(1)				(1_#_str2Hex(d1.getAbsolutePath())).getBytes()		                            </p>
     * 	<p>		/root/d1/f1			file(2)				(2_#_str2Hex(f1.getAbsolutePath())_#_str2Hex(f1.getContent())).getBytes()		</p>
     * 

其中节点类型分为根节点(0),目录节点(1),文件节点(2)

ConfigChangeListener配置文件变更监听器

ZooKeeperServer集群启动好,配置文件信息也发布到ZooKeeper中去了,现在可以启动客户端进行配置文件的同步与监听了,而这些工作主要由ConfigChangeListener和ClientConfigUpdater来完成。

先来看下ConfigChangeListener的主要工作,他的工作比较轻松,主要是向ZooKeeper进行注册一个监听器,监听所有配置文件的变更状态,包括新增、修改、删除的配置节点,而节点又分成配置文件的目录和具体的配置文件两种。

看一下配置文件变更监听器的具体工作:

    /**
     * 连接zookeeper
     */
    public void start() {
    	try{
			zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, watcher);
			// 等待连接上
			waitUntilConnected(zk, connectedLatch);
			logger.info("ConfigChangeListener Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
			// 先清空本地的配置文件目录
			ClientConfigUpdater.clearConfigDir(zk);
			// 监听根节点和所有子节点,并第一次同步数据到本地
			ClientConfigUpdater.listenNode(true,zk,watcher);
			// 等待配置信息变更
			Thread.currentThread().join();

		}catch(Exception e){
			logger.error("ConfigChangeListener occured error:",e);
		}
    }
  

1.首先监听器连接上ZooKeeper服务器,

2.然后清空本地的配置文件的目录

3.监听根节点和所有子节点,并第一次同步ZooKeeper服务端的数据到本地

4.等待配置信息的变更

来看下ClientConfigUpdater的listernNode方法,具体代码如下:

    /**
     * 监听根节点和递归监听所有子节点
     * @param syncData 是否需要同步服务端的数据到本地
     * @param zk
     * @param watcher
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    public static void listenNode(boolean syncData,ZooKeeper zk,Watcher watcher) throws KeeperException, InterruptedException, IOException{
    	// 监听根节点
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
        if(null!=zk.exists(rootNode, watcher)){
        	logger.debug("Listening node:"+rootNode);
        	List<String> subList = zk.getChildren(rootNode,watcher);
        	if(null!=subList && subList.size()>0){
        		// 将节点的所有子节点保存起来
        		ChildrenChangedWrapper.addChildren(rootNode, subList);
        	}
        	for (String subNode : subList) {
        		ClientConfigUpdater.listenSubNode(syncData,zk,rootNode,subNode,watcher);
        	}
        }else{
        	logger.warn("rootNode:"+rootNode+" does not exists!");
        }
    }

    /**
     * 监听子节点
     * @param syncData 是否需要同步服务端的数据到本地
     * @param zk
     * @param currentNode
     * @param subNode
     * @param watcher
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    public static void listenSubNode(boolean syncData,ZooKeeper zk,String currentNode,String subNode,Watcher watcher) throws KeeperException, InterruptedException, IOException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 监听子节点
    		if(null!=zk.exists(nodePath,watcher)){
    			// 如果需要同步数据到本地
            	if(syncData){
            		ClientConfigUpdater.syncNode(zk,nodePath);
            	}
    			logger.debug("Listening node:"+nodePath);
            	List<String> subList = zk.getChildren(nodePath,watcher);
            	if(null!=subList && subList.size()>0){
            		// 将节点的所有子节点保存起来
            		ChildrenChangedWrapper.addChildren(nodePath, subList);
            	}
            	for (String _subNode : subList) {
            		ClientConfigUpdater.listenSubNode(syncData,zk,nodePath,_subNode,watcher);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }

    /**
     * 从ZooKeeper服务端同步配置信息到本地
     * @param zk
     * @param path
     * @throws IOException
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static void syncNode(ZooKeeper zk,String path) throws IOException, KeeperException, InterruptedException{
    	if(null==zk.exists(path, null)){
    		logger.error("syncNode error,node ["+path+"] does not exists");
    		return;
    	}
    	byte[] data = zk.getData(path, true, null);
        String content = new String(data,Constant.CONF_CHAR_SET);
        // 对内容进行解析
        String[] strs = content.split(Constant.NODE_CONTENT_SEPRATOR);
        if(strs.length<2){
        	logger.error("syncNode error,strs length should not be less than 2");
        	return;
        }
        // 节点类型
        String nodeType = strs[0];
        // 节点操作类型
        String nodeOperateType = strs[1];
        logger.debug("nodeType:"+nodeType+",nodeOperateType:"+nodeOperateType);
        String dirPath = "";
        String filePath = "";
        String fileContent = "";
        if(strs.length==3){
        	dirPath = HexUtil.hex2Str(strs[2]);
        }else if(strs.length==4){
        	filePath = HexUtil.hex2Str(strs[2]);
        	fileContent = HexUtil.hex2Str(strs[3]);
        }
    	// 如果是目录节点,则创建目录
    	if(Constant.NODE_TYPE_DIRECTORY.equals(nodeType)){
    		if(StringUtils.isNotEmpty(dirPath)){
    			FileUtils.forceMkdir(new File(dirPath));
    			logger.info("Mkdir:"+dirPath);
    		}
		// 如果是文件节点,则生成该文件
    	}else if(Constant.NODE_TYPE_FILE.equals(nodeType)){
    		if(StringUtils.isNotEmpty(filePath) && StringUtils.isNotEmpty(fileContent)){
    			FileUtils.writeStringToFile(new File(filePath), fileContent);
    			logger.info("Write file:"+filePath);
    		}
    	}
    }

代码中有一行代码:ChildrenChangedWrapper.addChildren(rootNode, subList)

ChildrenChangedWrapper该包装器主要是用来保存每次更新后的某个节点的子节点情况,以便发生NodeChildrenChanged事件时,可以用以比较该节点更新后的子节点列表与更新前的列表。据此可以得知该节点发生的到底是创建子节点还是删除子节点的事件。

ChildrenChangedWrapper包装器的具体实现如下:

/**
 * 子节点变更包装器
 * 保存每个节点的子节点path
 * 用以判断变更的子节点的path,和变更的类型:新增、删除、更改
 * @author hwang
 *
 */
public class ChildrenChangedWrapper{

	private static final Log logger = LogFactory.getLog(ChildrenChangedWrapper.class);

	/**
	 * 子节点变更的类型
	 * @author hwang
	 *
	 */
	public static enum ChildrenChangeType{
		add,
		delete,
		update
	}

	/**
	 * 子节点变更的结果
	 * @author hwang
	 *
	 */
	public static class ChildrenChangeResult{
		private ChildrenChangeType changeType;

		private List<String> changePath;

		public ChildrenChangeType getChangeType() {
			return changeType;
		}
		public void setChangeType(ChildrenChangeType changeType) {
			this.changeType = changeType;
		}
		public List<String> getChangePath() {
			return changePath;
		}
		public void setChangePath(List<String> changePath) {
			this.changePath = changePath;
		}
	}


	/**
	 * 记录当前状态下的节点node的所有子节点
	 */
	private static Map<String,Set<String>> nodeChildrenMap = new ConcurrentHashMap<String,Set<String>>();

	/**
	 * 增加子节点
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void addChildren(String nodePath,String childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				childrenSet = new CopyOnWriteArraySet<String>();
			}
			childrenSet.add(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}

	/**
	 * 增加子节点
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void addChildren(String nodePath,Collection<String> childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				childrenSet = new CopyOnWriteArraySet<String>();
			}
			childrenSet.addAll(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}

	/**
	 * 删除子节点
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void removeChildren(String nodePath,String childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				logger.warn("Node:"+nodePath+" has no children.");
				return;
			}
			childrenSet.remove(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}

	/**
	 * 删除子节点
	 * @param nodePath
	 * @param childrenPath
	 */
	public static void removeChildren(String nodePath,Collection<String> childrenPath){
		synchronized (nodeChildrenMap) {
			Set<String> childrenSet = nodeChildrenMap.get(nodePath);
			if(null==childrenSet){
				logger.warn("Node:"+nodePath+" has no children.");
				return;
			}
			childrenSet.removeAll(childrenPath);
			nodeChildrenMap.put(nodePath, childrenSet);
		}
	}

	protected static void showNodeChildren(String nodePath){
		Set<String> childrenSet = nodeChildrenMap.get(nodePath);
		if(null==childrenSet){
			logger.warn("Node:"+nodePath+" has no children.No need to show.");
			return;
		}
		StringBuffer sb = new StringBuffer();
		for(String child : childrenSet){
			sb.append(child).append(",");
		}
		logger.debug("Node:"+nodePath+",children:"+sb.toString());
	}

	/**
	 * 计算节点的变更情况
	 * @param nodePath
	 * @param currentChildrenPath
	 * @return
	 */
	public static ChildrenChangeResult diff(String nodePath,List<String> currentChildrenPath){
		synchronized (nodeChildrenMap) {
			ChildrenChangeResult changeResult = new ChildrenChangeResult();
			ChildrenChangeType changeType = ChildrenChangeType.add;
			List<String> changePath = new ArrayList<String>();
			// 获取节点的原始子节点
			Set<String> originalChildrenPath = nodeChildrenMap.get(nodePath);
			if(null==originalChildrenPath){
				logger.info("Node:"+nodePath+" has no children.All currentChildrenPath are new.");
				changeType = ChildrenChangeType.add;
				changePath = currentChildrenPath;
			}else{
				int currentSize = currentChildrenPath.size();
				int originalSize = originalChildrenPath.size();
				if(currentSize>originalSize){
					changeType = ChildrenChangeType.add;
					for(String curPath : currentChildrenPath){
						// if originalChildrenPath does not contains curPath
						// then curPath is newly added
						if(!originalChildrenPath.contains(curPath)){
							changePath.add(curPath);
						}
					}
				}else if(currentSize<originalSize){
					changeType = ChildrenChangeType.delete;
					for(String oriPath : originalChildrenPath){
						// if currentChildrenPath does not contains oriPath
						// then oriPath is removed
						if(!currentChildrenPath.contains(oriPath)){
							changePath.add(oriPath);
						}
					}
				}
			}
			changeResult.setChangeType(changeType);
			changeResult.setChangePath(changePath);
			return changeResult;
		}
	}

} 	

当发生节点变更事件时,ClientConfigUpdater将负责更新本地的配置文件,具体实现如下:

    /**
	 * 更新本地的配置文件信息,更新完成后再次对每个节点进行监听
	 * @param zk
	 * @param event
	 * @param watcher
	 * @throws Exception
	 */
    public static void updateConfigFiles(ZooKeeper zk,WatchedEvent event,Watcher watcher) throws Exception {
        String path = event.getPath();
        EventType type = event.getType();
        logger.info("EventType:"+type+",Value:"+type.getIntValue()+",path:"+path);
        switch(type){
	        case NodeDeleted:{
	        	String filePath = path.substring(("/"+Constant.CONF_ROOT_NODE).length()+1).replaceAll(Constant.SEPRATOR, "/");
	        	FileUtils.deleteQuietly(new File(filePath));
	        }break;
	        case NodeCreated:
	        case NodeDataChanged:{
	        	nodeHandle(zk,path);
	        }break;
	        case NodeChildrenChanged:{
	        	logger.info("Catch [NodeChildrenChanged] event");
	        	// 获取节点的当前子节点
	        	if(null==zk.exists(path, false)){
	        		logger.info("Node:"+path+" does not exists,will do nothing.");
	        		break;
	        	}
	        	List<String> currentChildrenPath = zk.getChildren(path, false);
	        	ChildrenChangeResult changeResult = ChildrenChangedWrapper.diff(path, currentChildrenPath);
	        	ChildrenChangeType changeType = changeResult.getChangeType();
	        	List<String> changePath = changeResult.getChangePath();
	        	switch(changeType){
		        	case add:{
		        		for(String subPath : changePath){
		        			logger.info("Add children node,path:"+path+"/"+subPath);
		        			nodeHandle(zk,path+"/"+subPath);
		        		}
		        	}break;
		        	case delete:{
		        		for(String subPath : changePath){
		        			String filePath = subPath.replaceAll(Constant.SEPRATOR, "/");
		    	        	FileUtils.deleteQuietly(new File(filePath));
		    	        	logger.info("Delete children node,file:"+filePath);
		        		}
		        	}break;
		        	case update:{
		        		logger.info("Update children node,will do nothing");
		        	}break;
		        	default:{
		        		logger.info("Default children node operate,will do nothing");
		        	}break;
	        	}
	        }break;
	        default:{
	        	logger.info("Catch [default] event,will do nothing");
	        }break;
        }
        // 再次注册所有节点的监听事件
        ClientConfigUpdater.listenNode(false,zk,watcher);
    }

    /**
     * 节点处理
     * @param zk
     * @param path
     * @throws KeeperException
     * @throws InterruptedException
     * @throws IOException
     */
    public static void nodeHandle(ZooKeeper zk,String path) throws KeeperException, InterruptedException, IOException{
    	// 对path节点立刻进行监听,防止新创建的节点又立刻创建子节点
        byte[] data = zk.getData(path, true, null);
        logger.info("Re-listening node:"+path);
        String content = new String(data,Constant.CONF_CHAR_SET);
        // 对内容进行解析
        String[] strs = content.split(Constant.NODE_CONTENT_SEPRATOR);
        // 节点类型
        String nodeType = strs[0];
        // 节点操作类型
        String nodeOperateType = strs[1];
        logger.info("nodeType:"+nodeType+",nodeOperateType:"+nodeOperateType);
        // 如果是目录节点
    	if(Constant.NODE_TYPE_DIRECTORY.equals(nodeType)){
    		// 新增目录
			if(Constant.NODE_OPERATE_TYPE_ADD.equals(nodeOperateType)){
				if(strs.length==3){
					String dirPath = HexUtil.hex2Str(strs[2]);
					FileUtils.forceMkdir(new File(dirPath));
				}else{
					logger.warn("The str array length should be 3,while actually is:"+strs.length);
				}
			// 更新目录
			}else if(Constant.NODE_OPERATE_TYPE_UPDATE.equals(nodeOperateType)){
				if(strs.length==4){
					File oldDir = new File(HexUtil.hex2Str(strs[2]));
					File newDir = new File(HexUtil.hex2Str(strs[3]));
					if(oldDir.exists()){
						FileUtils.moveDirectory(oldDir, newDir);
					}else{
						logger.error("Dir["+oldDir+"] does not exist");
					}
				}else{
					logger.warn("The str array length should be 4,while actually is:"+strs.length);
				}
			// 删除目录
			}else{
				logger.info("Delete dir:"+path.replaceAll(Constant.SEPRATOR, "/"));
			}
    	// 如果是文件节点
    	}else if(Constant.NODE_TYPE_FILE.equals(nodeType)){
    		// 新增文件或者更新文件
			if(Constant.NODE_OPERATE_TYPE_ADD.equals(nodeOperateType) || Constant.NODE_OPERATE_TYPE_UPDATE.equals(nodeOperateType)){
				if(strs.length==4){
					String filePath = HexUtil.hex2Str(strs[2]);
		        	String fileContent = HexUtil.hex2Str(strs[3]);
		        	FileUtils.writeStringToFile(new File(filePath), fileContent);
				}else{
					logger.warn("The str array length should be 4,while actually is:"+strs.length);
				}
			// 删除文件
			}else{
				logger.info("Delete file:"+path.replaceAll(Constant.SEPRATOR, "/"));
			}
    	}
    }
    

每次更新完本地配置信息后,需要重新对所有节点进行监听。

ServerConfigChanger配置文件修改器

配置文件修改器的工作就是连接上ZooKeeper后,变更(或新增、删除)对应节点的数据,具体的代码如下:

/**
 * 服务端配置文件更改器
 * @author hwang
 *
 */
public class ServerConfigChanger {

	private static final Log logger = LogFactory.getLog(ServerConfigChanger.class);

	private static ZooKeeper zk;

    private static CountDownLatch connectedLatch = new CountDownLatch(1);

	static class ConnectedWatcher implements Watcher {
        private CountDownLatch connectedLatch;
        ConnectedWatcher(CountDownLatch connectedLatch) {
            this.connectedLatch = connectedLatch;
        }
        @Override
        public void process(WatchedEvent event) {
           if (event.getState() == KeeperState.SyncConnected) {
               connectedLatch.countDown();
           }
           logger.debug("ServerConfigChanger get ["+event.getType()+"] notice by ConnectedWatcher");
        }
    }

	static{
		if(null==zk){
    		try {
				connectZkServer();
			} catch (Exception e) {
				e.printStackTrace();
			}
    	}
	}

	private static void waitUntilConnected(ZooKeeper zooKeeper, CountDownLatch connectedLatch) {
		logger.info("ZooKeeper State:"+zooKeeper.getState());
        if (States.CONNECTING == zooKeeper.getState()) {
            try {
            	logger.info("ConnectedLatch.await");
                connectedLatch.await();
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }
    }


    /**
     * 连接zookeeper
     */
    private static void connectZkServer() throws Exception {
        zk = new ZooKeeper(Constant.ZK_CLUSTER_HOSTS, 5000, new ConnectedWatcher(connectedLatch));
        // 等待连接上
    	waitUntilConnected(zk, connectedLatch);
    	logger.info("ServerConfigChanger Connected to ZooKeeper Cluster:"+Constant.ZK_CLUSTER_HOSTS);
    }

    /**
     * 新增目录节点
     * @param dirAbsolutePath 目录的绝对路径,该目录必须是/config/开头的目录
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static void addConfigDir(String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(dirAbsolutePath)){
    		logger.error("dirAbsolutePath can't be empty");
    		return;
    	}
    	ZkUtil.createDirNode(zk, dirAbsolutePath);
    }

    /**
     * 删除目录节点
     * @param dirAbsolutePath
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static void deleteConfigDir(String dirAbsolutePath) throws InterruptedException, KeeperException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	ZkUtil.deleteDirNode(zk, dirAbsolutePath);
    }


    public static void updateConfigDir(String oldDirAbsolutePath,String newDirAbsolutePath) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(oldDirAbsolutePath)  || StringUtils.isEmpty(newDirAbsolutePath)){
    		logger.error("oldDirAbsolutePath,newDirAbsolutePath can't be empty");
    		return;
    	}
		ZkUtil.updateDirNode(zk, oldDirAbsolutePath, newDirAbsolutePath);
    }


    /**
     * 新增文件节点
     * @param fileAbsolutePath 文件的绝对路径,不包括文件名
     * @param fileName 文件名
     * @param fileContent 文件内容
     * @throws KeeperException
     * @throws InterruptedException
     * @throws UnsupportedEncodingException
     */
    public static void addConfigFile(String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return;
    	}
    	ZkUtil.createFileNode(zk, fileAbsolutePath, fileName, fileContent);
    }

    /**
     * 删除文件节点
     * @param fileAbsolutePath 文件的绝对路径,不包括文件名
     * @param fileName 文件名
     * @throws InterruptedException
     * @throws KeeperException
     */
    public static void deleteConfigFile(String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){
    		logger.error("fileAbsolutePath,fileName can't be empty");
    		return;
    	}
    	ZkUtil.deleteFileNode(zk, fileAbsolutePath, fileName);
    }


    public static void updateConfigFile(String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	if(null==zk){
    		logger.warn("Not connected to ZooKeeper,will return");
    		return;
    	}
    	if(StringUtils.isEmpty(fileAbsolutePath)  || StringUtils.isEmpty(fileName)  || StringUtils.isEmpty(fileContent)){
    		logger.error("fileAbsolutePath,fileName,fileContent can't be empty");
    		return;
    	}
		ZkUtil.updateFileNode(zk, fileAbsolutePath, fileName, fileContent);
    }

    public static void testAdd() throws InterruptedException, KeeperException, UnsupportedEncodingException{
    	String dirAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
    	ServerConfigChanger.addConfigDir(dirAbsolutePath);

		String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
		String fileName = "Test2_SearchEngineConfig.xml";
		String fileContent = "This is a test content added by zookeeper";
		try{
			Thread.sleep(2000);
		}catch(Exception e){

		}
		ServerConfigChanger.addConfigFile(fileAbsolutePath,fileName,fileContent);

    }

    public static void testDelete() throws InterruptedException, KeeperException{
    	String dirAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";

    	String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/test";
    	String fileName = "Test2_SearchEngineConfig.xml";
    	ServerConfigChanger.deleteConfigFile(fileAbsolutePath,fileName);

    	ServerConfigChanger.deleteConfigDir(dirAbsolutePath);
    }


    public static void testUpdate() throws InterruptedException, KeeperException, UnsupportedEncodingException{

    	String fileAbsolutePath = "/home/NetPF/bin/zk/zkconfig/tianya";
    	String fileName = "Tianya_SearchEngineConfig.xml";
    	String fileContent = "Tianya_SearchEngineConfig.xml哈哈";
    	ServerConfigChanger.updateConfigFile(fileAbsolutePath,fileName,fileContent);

    }

    /**
     * 获取根节点和所有子节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static void getNode() throws KeeperException, InterruptedException{
    	// 监听根节点
    	String rootNode = "/" + Constant.CONF_ROOT_NODE;
        if(null!=zk.exists(rootNode, false)){
        	logger.info("get node:"+rootNode);
        	List<String> subList = zk.getChildren(rootNode,false);
        	for (String subNode : subList) {
        		getSubNode(rootNode,subNode);
        	}
        }else{
        	logger.warn("rootNode:"+rootNode+" does not exists!");
        }
    }

    /**
     * 获取子节点
     * @param currentNode
     * @param subNode
     * @throws KeeperException
     * @throws InterruptedException
     */
    public static void getSubNode(String currentNode,String subNode) throws KeeperException, InterruptedException{
    	String nodePath = currentNode+"/"+subNode;
    	if(nodePath.startsWith("/")){
    		// 监听子节点
    		if(null!=zk.exists(nodePath,false)){
    			logger.info("get node:"+nodePath);
            	List<String> subList = zk.getChildren(nodePath,false);
            	for (String _subNode : subList) {
            		getSubNode(nodePath,_subNode);
            	}
            }else{
            	logger.warn("subNode:"+nodePath+" does not exists!");
            }
    	}
    }

    public static void main(String[] args) throws  KeeperException, InterruptedException, UnsupportedEncodingException {
    	testAdd();
//    	testDelete();
    	testUpdate();
    	getNode();
	}

}

至此,一个简单的分布式配置文件管理系统完成了。

未解决的问题

由于程序直接使用了ZooKeeper的api,对于一些问题还未能充分解决,比如ServerConfigChanger在新增节点时在根节点/config下创建一个目录节点(dirNode1)和文件节点(fileNode1),两个步骤之间没有时间间隔,这时将会出现以下问题:ConfigChangeListener只监听到了/config节点的NodeChildrenChanged事件,客户端可以正常创建dirNode1的节点,而还未来得及监听dirNode1的节点变更事件,导致dirNode1节点下的fileNode1节点未能成功通知到客户端,所以客户端就无法成功创建fileNode1节点

下一步将通过改写成使用ZkClient来与ZooKeeper进行交互。ZkClient充分考虑到了session超时重连、事件重新自动注册的问题。

本篇博客是我的第一篇博客,还有许多不足的地方,请各位大牛予以指正,我一定悉心学习。

03-18 07:59