首页 > 网络 > 云计算 >

Zookeeper监听存活节点代码实例

2017-10-06

Zookeeper监听存活节点代码实例。zookeeper 监听存活节点:应用场景,公司服务器不想做负载均衡,但又担心单点故障的情况发生,于是将服务器资源注册到zookeeper中,客户端从zookeeper中动态获取服务器资源,然后通过资源进行访问,可以注册多个服务器资源到zookeeper,客户

zookeeper 监听存活节点

应用场景,公司服务器不想做负载均衡,但又担心单点故障的情况发生,于是将服务器资源注册到zookeeper中,客户端从zookeeper中动态获取服务器资源,然后通过资源进行访问,可以注册多个服务器资源到zookeeper,客户端监听zookeeper中的服务资源,当服务端有故障,比如服务当机,则zookeeper中的资源因为长连接断开而自动移除资源,同时客户端的资源会自动重载过滤,达到自动选择存活节点的目的。

注意事项:

1:创建节点时,需要创建临时节点ephemeral,session失效时间要按需求设置,session失效时间默认为30秒。

2:创建节点时,先删除此节点。

实现代码如下:

public interface ZkConfig {

        /**
         * 配置平台根节点名称
         */
        static String root = "/server_node";

        /**
         * 初始化配置
         */
        void init();

        /**
         * 重新加载配置资源
         */
        void reload();

        /**
         * 添加配置
         * @param key
         * @param value
         */
        void add(String key, String value);

        /**
         * 更新配置
         * @param key
         * @param value
         */
        void update(String key, String value);

        /**
         * 删除配置
         * @param key
         */
        void delete(String key);

        /**
         * 获取配置
         * @param key
         * @return
         */
        String get(String key);

        /**
         * 获取所有的配置内容
         * @return
         */
        Map getAll();
}

监听实现

public class ZkWatcher {

    private ZkClient client;

    private ZkListener zkListener;

    private ZkConfig zkConfig ;

    public ZkWatcher(ZkClient client, ZkConfig zkConfig) {
        this.client = client;
        this.zkConfig = zkConfig ;
        this.initConfig();
    }

    private void initConfig(){
        zkListener = new ZkListener();
    }

    public void watcher(String key){
        client.subscribeDataChanges(key, configYardListener);
        client.subscribeChildChanges(key, configYardListener);
    }

    /**
     * 配置监听器
     * @author flyking
     *
     */
    private class ZkListener  implements IZkDataListener,IZkChildListener{
        public void handleDataChange(String dataPath, Object data)
                throws Exception {
           System.out.println("data "+dataPath+" change,start reload configProperties");
            configYard.reload();
        }

        public void handleDataDeleted(String dataPath) throws Exception {
            System.out.println("data "+dataPath+" delete,start reload configProperties");
            configYard.reload();
        }

        public void handleChildChange(String parentPath,
                                      List currentChilds) throws Exception {
            System.out.println("data "+parentPath+" ChildChange,start reload configProperties");
            ZkConfig.reload();
        }

    }
}

接口方法实现

public class ZkConfigImpl implements ZkConfig {

    /**
     * 存储配置内容
     */
    private volatile Map zkProperties = new HashMap();

    private ZkClient client;

    private ZkWatcher  zkWatcher ;

    public ZkConfigImpl(String serverstring) {

        this.client = new  ZkClient(serverstring, 300, 3000);
        zkWatcher = new ZkWatcher(client,this);
        this.init();
    }

    /**
     * 初始化加载配置到内存
     */
    public void init() {
        if(!client.exists(root)){
            client.createEphemeral(root);
        }
        if (zkProperties == null) {
            System.out.println("start to init zkProperties");
            zkProperties = this.getAll();
            System.out.println("init zkProperties over");
        }
    }

    private String contactKey(String key){
        return root.concat("/").concat(key);
    }

    public void add(String key, String value) {
        String contactKey = this.contactKey(key);
        this.client.createEphemeral(contactKey, value);
        zkWatcher.watcher(contactKey);
    }

    public void update(String key, String value) {
        String contactKey = this.contactKey(key);
        this.client.writeData(contactKey, value);
        zkWatcher.watcher(contactKey);
    }

    public void delete(String key) {
        String contactKey = this.contactKey(key);
        this.client.delete(contactKey);
    }

    public String get(String key) {
        if(this.zkProperties.get(key) == null){
            String contactKey = this.contactKey(key);
            if(!this.client.exists(contactKey)){
                return null;
            }
            return this.client.readData(contactKey);
        }
        return zkProperties.get(key);
    }

    public Map getAll() {
        if(zkProperties != null){
            return zkProperties;
        }
        List list = this.client.getChildren(root);
        Map currentProperties = new HashMap();
        for(String zk : list){
            String value = this.client.readData(zk);
            String key = zk.substring(zk.indexOf("/")+1);
            currentProperties.put(key, value);
        }
        return zkProperties;
    }

    public void reload() {
        List list = this.client.getChildren(root);
        Map currentProperties = new HashMap();
        for(String zk : list){
            String value = this.client.readData(this.contactKey(zk));
            System.out.println(zk +"========="+value);
            currentProperties.put(zk, value);
        }
        zkProperties = currentProperties;
    }

}

测试

public static void main(String[] args) {


        ZkConfigImpl  zk = new ZkConfigImpl ("192.168.41.128:2181");
        zk.add("server1", "192.168.1.101:1101");
        zk.add("server2", "192.168.1.101:1102");

        System.out.println("value is===>"+zk.get("server1"));
        System.out.println("value is===>"+zk.get("server2"));

        yard.update("server1", "192.168.9.109:8080");
        System.out.println("update server1 value is===>"+zk.get("server1"));
        yard.delete("server1");
        yard.delete("server2");

    }
相关文章
最新文章
热点推荐