主备服务器如何倒换? 

采用 ZooKeeper 来做主备决策,主备服务器都连接到 ZooKeeper 建立自己的节点,主服务器的路径规则为“/MQ/server/ 分区编号 /master”,备机为“/MQ/server/ 分区编号 /slave”,节点类型为 EPHEMERAL。

备机监听主机的节点消息,当发现主服务器节点断连后,备服务器修改自己的状态,对外提供消息读取服务。

简单实现代码如下:

主服务创建主服务节点信息:

public class Master {
    private String MQ = "/MQ";
    private String server = "/server";
    private String id = "/1";
    private String masterNode = "/master";


    public void init() {
        ZkClient zkClient = new ZkClient("192.168.0.239:2181");
        if(!zkClient.exists(MQ)) {
            zkClient.createPersistent(MQ);
        }

        if(!zkClient.exists(MQ + server)) {
            zkClient.createPersistent(MQ + server);
        }

        if(!zkClient.exists(MQ + server + id)) {
            zkClient.createPersistent(MQ + server + id);
        }

        if(!zkClient.exists(MQ + server + id + masterNode)) {
            zkClient.createEphemeral(MQ + server + id + masterNode, "192.168.0.156:8080");
        }
    }

    public static void main(String[] args) throws Exception{
        Master master = new Master();
        master.init();
        System.in.read();
    }
}

备服务器监听主服务器状态,主服务器故障。切换备份服务器为主服务器

public class Slave {
    private String MQ = "/MQ";
    private String server = "/server";
    private String id = "/1";
    private String salverNode = "/slave";
    private String masterNode = "/master";


    public void init() {
        ZkClient zkClient = new ZkClient("192.168.0.239:2181");

        if(!zkClient.exists(MQ)) {
            zkClient.createPersistent(MQ);
        }

        if(!zkClient.exists(MQ + server)) {
            zkClient.createPersistent(MQ + server);
        }

        if(!zkClient.exists(MQ + server + id)) {
            zkClient.createPersistent(MQ + server + id);
        }

        if(!zkClient.exists(MQ + server + id + salverNode)) {
            zkClient.createEphemeral(MQ + server + id + salverNode, "192.168.0.157:8080");
        }

        zkClient.subscribeDataChanges(MQ + server + id + masterNode, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("订阅数据发生变化" + dataPath);
                zkClient.delete(MQ + server + id + salverNode);
                zkClient.createEphemeral(MQ + server + id + masterNode,"192.168.0.157:8080");
            }
        });
    }

    public static void main(String[] args) throws Exception{
        Slave slave = new Slave();
        slave.init();
        System.in.read();
    }
}
12-18 11:31