35.1 Zookeeper



一、本周介绍

1 Zookeeper

2 Dubbo

二、本章介绍

2.1 主要内容

三、初识Zookeeper

3.1 Zookeeper:为分布式应用程序提供分布式协调服务的框架

1.分布式的协调框架

源自雅虎研究院,不做软件业务,偏向研究软件基础。

需求:希望有这样一个系统,通过这个系统能搜索到一些数据。比如想查询某个数据库的地址。

当前问题:一个单机系统,就能对外提供服务。但单点,不可靠。比如,机器、网络、电力等不可控因素太多。

很简单:写一个Spring Boot项目,里面开一个接口,写一个Hash Map对外提供服务。你给我一个key,我就返回对应的value。这么做,就是单点。

2.分布式应用程序的分布式协调服务

简单理解为:一个比较可靠的、能够对外提供一致服务数据库

相比于普通数据库,它最大的不同是:分布式部署,多节点并存。所以:

  • 就变得高可用,不会因为某一个节点宕机而不可用
  • 多节点对外发布消息都是一致的

3.2 特点

3.3 架构(图)

  • Leader:能够协调不同的server进行工作。处理写相关的请求,并同步到其他的跟随者上去。如果Leader意外宕机,会重新选举一个leader。
  • 不同的客户端client,连接到不同的服务器server,不会把所有的压力都给同一个服务器。

Zookeeper集群中server数量总是确定的,所以集群中的server交互采用比较可靠的bio长连接模型;

不同于集群中sever间交互,zookeeper客户端其实数量是未知的,为了提高zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。

3.4 典型的应用场景:4个

四、Zookeeper的安装、配置

主要是Linux系统。

4.1 Linux下的安装

1.下载安装包

网址:https://downloads.apache.org/zookeeper/

借鉴老师提供的《Zookeeper的安装、配置教辅》

到电脑的cmd中:

登录阿里云的远程服务器的Linux系统:

ssh root@121.199.31.220

wget https://downloads.apache.org/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

2.解压缩:

tar zxvf apache-zookeeper-3.6.4-bin.tar.gz

3.

复制一个配置文件:

cp conf/zoo_sample.cfg conf/zoo.cfg

进入该配置文件进行观看:不需要修改:

vim zoo.cfg

启动:./bin/zkServer.sh start

关闭:./bin/zkServer.sh stop

4.2 Windows下安装

前提:Windows电脑上要有wget命令才行,如果没有参考以下文章进行简单的安装:https://blog.51cto.com/huny/3265480

1.

wget https://downloads.apache.org/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

2.

复制配置文件

(因为Windows下的一些cmd指令,跟Linux下不一样,比如这里就必须用Windows的copy、copyx等指令,有一定的熟悉成本,还不如手动复制几秒搞定 )

3.启动

4.3 Mac OS下安装

一个brew就能搞定

五、节点znode(基本数据类型)

5.1 树结构

多叉树;

圆圈就是一个一个的节点,数据就存储在里面;

5.2 节点的特点:4个

5.3 节点的类型:4个

六、操作节点的常用命令

6.1 基础命令:增删改查

1.前提准备

Linux环境下:

注意:
要想运行Zookeeper的命令,得需要Java环境。所以,Linux系统中,如果没有安装,就要安装JDK1.8。

  • 执行命令yum -y list java*查看可安装java版本
  • 执行命令yum install -y java-1.8.0-openjdk-devel.x86_64, 耐心等待至自动安装完成
  • 输入java -version查看已安装的jdk版本,

启动Zookeeper服务:./bin/zkServer.sh start

关闭Zookeeper服务:./bin/zkServer.sh stop

连接:./bin/zkCli.sh -server 127.0.0.1:2181

连接成功;

以上两者,是什么区别?

zookeeper不仅提供了服务端命令,而且提供了客户端命令

zkServer ------------

zkCli ------------------

见3.3 。

2.

通过指令help来查看各种命令:

3.create、get

create /java1 hellojava1

create /java1/zk1 hellozk1

ls /java1------------------------------------- 查看该节点下的,有哪些子节点

get /java1------------------------------------ 查看该节点存储的数据是什么

ls -R /---------------- 以递归的形式,把根目录下的所有节点,显示出来

4.set、delete

以上,就是启动了服务器、启动了客户端连接上去之后,对于节点进行了增删改查相关的基础操作。

6.2 高级命令:版本号、顺序节点

1.

stat /java1--------------------- 查看该节点的属性状态:

注意下面两个属性的区别:

2.用版本号,对节点数据进行赋值、删除

dataVersion表示该节点的数据的变化次数

假设有两个客户端,都要对这个节点的数值进行修改:

版本号与set、delete也是一样的

3.临时节点、顺序节点

以上,全都是持久节点,即没有使用到顺序节点,也没有使用到临时节点。

-e:表示临时节点:

-s:表示顺序节点--------------同一个父节点下,所有节点的顺序都是自增的,不重复

-s+-e

以上,就是Zookeeper高级命令的讲解。

七、Watcher机制

7.1 作用

Watcher:监视器、监听器。

叫什么不重要,重要的是它有什么作用:

如果某一个节点的数据发生变化,就立马通知其他所有节点。后者会根据通知,同步更新。

7.2 事件类型

常见一部分:

八、用ACL实现细粒度的权限控制

8.1 权限:CRWDA

5种权限:CRWDA

admin:是最高级别的权限

8.2 权限方案

实际开发,用的不多,了解即可,运维会用的较多。

九、使用Java代码操作Zookeeper

9.1 通过ZK原生的客户端,连接ZK,并节点操作

1.连接ZK

新建maven项目:

引入依赖:

    <dependencies>
        <!--1. 引入zookeeper依赖-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>'
        
    </dependencies>

在常量类ZkConstant中:

/**
 *  ZK常量类
 */
public class ZkConstant {

    public static final String ZK_HOST = "127.0.0.1:2181";      // 如果是远端云服务器,就把IP改下,同时安全组里把2181端口打开
    public static final Integer CONNECT_TIMEOUT = 3000;         // 3000ms


}

在类ConnectWatcher中:

/**
 * 连接Watcher
 */
public class ConnectWatcher implements Watcher {


    @Override
    public void process(WatchedEvent event) {            // 被监听到了的事件
        System.out.println("ConnectWatcher类的process方法被调用了");
        if (event.getState() == Event.KeeperState.SyncConnected) {
            System.out.println("连接成功");
        }
        if (event.getState() == Event.KeeperState.Closed) {
            System.out.println("连接关闭");
        }
    }

}

在类FirstConnect中:

/**
 *  连接到ZK
 */
public class FirstConnect {

    public static void main(String[] args) throws IOException, InterruptedException {
        ConnectWatcher connectWatcher = new ConnectWatcher();
        // 前两个参数容易,是两个常量            // 第三个参数是监听者
        ZooKeeper zk = new ZooKeeper(ZkConstant.ZK_HOST, ZkConstant.CONNECT_TIMEOUT, connectWatcher);

        System.out.println("客户端开始连接ZK服务器");
        States state = zk.getState();
        System.out.println(state);

        Thread.sleep(2000);         // 因为连接是需要时间的,所以模拟下。
        state = zk.getState();

        System.out.println(state);        // 2秒后,通常是已经连接成功了
        Thread.sleep(3000);         // 模拟连接成功之后,进行的一系列的操作
        zk.close();

    }

}

前提:要先在自己电脑windows上,开启Zookeeper的服务:

结果:

客户端开始连接ZK服务器
CONNECTING
ConnectWatcher类的process方法被调用了:
连接成功
CONNECTED
ConnectWatcher类的process方法被调用了:
连接关闭

2.节点操作

在常量类ZkConstant中:

/**
 *  ZK常量类
 */
public class ZkConstant {

    public static final String ZK_HOST = "127.0.0.1:2181";      // 如果是远端云服务器,就把IP改下,同时安全组里把2181端口打开
    public static final Integer CONNECT_TIMEOUT = 3000;         // 3000ms


    public static final String PATH1 = "/imooc-my-first-node";         // 节点1


}

在类ZKCRUD中:

/**
 *  对于节点的增删改查
 */
public class ZKCRUD {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ConnectWatcher connectWatcher = new ConnectWatcher();
        // 前两个参数容易,是两个常量            // 第三个参数是监听者
        ZooKeeper zk = new ZooKeeper(ZkConstant.ZK_HOST, ZkConstant.CONNECT_TIMEOUT, connectWatcher);

        System.out.println("客户端开始连接ZK服务器");
        States state = zk.getState();
        System.out.println(state);
        Thread.sleep(2000);         // 因为连接是需要时间的,所以模拟下。
        state = zk.getState();
        System.out.println(state);        // 2秒后,通常是已经连接成功了
        Thread.sleep(3000);         // 模拟连接成功之后,进行的一系列的操作
        // 对节点进行增删改查
        zk.create(ZkConstant.PATH1, "imooc1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);// 跟命令的叫法是一样的,只不过参数一样
        Thread.sleep(2000);
        byte[] data = null;
        data = zk.getData(ZkConstant.PATH1, null, null);            // 获取节点的数值
        System.out.println(new String(data));
        
        Thread.sleep(2000);
        zk.setData(ZkConstant.PATH1, "imooc2".getBytes(), -1);      // -1 表示不需要指定版本
        Thread.sleep(2000);
        data = zk.getData(ZkConstant.PATH1, null, null);            // 获取节点的数值
        System.out.println(new String(data));
        
        zk.delete(ZkConstant.PATH1, -1);

        zk.close();
    }

}

其中:

  • 关于上述的权限:

结果:

客户端开始连接ZK服务器
CONNECTING
ConnectWatcher类的process方法被调用了:
连接成功
CONNECTED
imooc1
imooc2
ConnectWatcher类的process方法被调用了:
连接关闭

9.2 处理Watcher事件

1.

如果没有加上监听器的情况:

2.

如果加上了监听器的情况:

注意监听器只能监听1次

9.3 通过Apache Curator客户端,来操作ZK(推荐)

1.原生的Java的客户端的缺点

  • 3.还不支持对Watcher的永久监听

2.Apache Curator客户端的优点

(1)非永久监听

引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.imooc</groupId>
    <artifactId>zk-pratice</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--1. 引入zookeeper依赖,用于作为服务的注册与发现-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.0</version>
        </dependency>

        <!--2. 引入curator的两个依赖,,通过Java代码操作ZK-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

    </dependencies>

</project>
/**
 *  用Curator来操作ZK
 */
public class CuratorTests {

    public static void main(String[] args) throws Exception {
        String path = "/curator1";

        String connectString = "127.0.0.1:2181";        // 指定路径
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);  // 指定重试机制
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);      // 创建客户端

        // 启动客户端
        client.start();
        client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event)->{
            switch (event.getType()){
                case WATCHED:
                    WatchedEvent watchedEvent = event.getWatchedEvent();
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                        System.out.println(new String(c.getData().forPath(path)));          // 拿到该节点的内容,byte数组,转换为字符串
                        System.out.println("触发事件");
                    }
            }
        });

        String data = "test";
        String data2 = "test2";
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes());     // 轻松的创建完一个节点
        byte[] bytes = client.getData().watched().forPath(path);                           // 拿到该节点的值
        System.out.println(new String(bytes));

        client.setData().forPath(path, data2.getBytes());       // 修改节点
        client.setData().forPath(path, data2.getBytes());       // 修改节点
        client.setData().forPath(path, data2.getBytes());       // 修改节点

        client.delete().forPath(path);                          // 删除节点
        Thread.sleep(2000);

    }

}

结果:

能够成功的用Java代码操作ZK节点,并能成功监听;

但是,第二次、第三次的节点修改,并没有触发监听,说明这是个一次性的watch:

test
    
test2
触发事件

(2)永久监听

/**
 *  用Curator来操作ZK
 */
public class CuratorTests {

    public static void main(String[] args) throws Exception {
        String path = "/curator1";

        String connectString = "127.0.0.1:2181";        // 指定路径
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);  // 指定重试机制
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);      // 创建客户端

        // 启动客户端
        client.start();
        client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event)->{
            switch (event.getType()){
                case WATCHED:
                    WatchedEvent watchedEvent = event.getWatchedEvent();
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
                        System.out.println(new String(c.getData().forPath(path)));          // 拿到该节点的内容,byte数组,转换为字符串
                        System.out.println("触发事件");
                    }
            }
        });

        String data = "test";
        String data2 = "test2";
        client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes());     // 轻松的创建完一个节点
        byte[] bytes = client.getData().watched().forPath(path);                           // 拿到该节点的值
        System.out.println(new String(bytes));

        client.setData().forPath(path, data2.getBytes());       // 修改节点
        client.setData().forPath(path, data2.getBytes());       // 修改节点
        client.setData().forPath(path, data2.getBytes());       // 修改节点

        client.delete().forPath(path);                          // 删除节点
        Thread.sleep(2000);
        

        // 永久监听
        String pathNew = "/curatorNew";
        client.create().withMode(CreateMode.EPHEMERAL).forPath(pathNew, data.getBytes());       // 创建新节点
        NodeCache nodeCache = new NodeCache(client, pathNew);
        nodeCache.start();

        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null) {
                    System.out.println("触发了永久监听的回调,当前值为:"+new String(currentData.getData()));
                }
            }
        });
        client.setData().forPath(pathNew, data2.getBytes());
        Thread.sleep(2000);             // 等待两秒,因为需要时间才能生效
        client.setData().forPath(pathNew, data2.getBytes());
        Thread.sleep(2000);             // 等待两秒,因为需要时间才能生效
        client.setData().forPath(pathNew, data2.getBytes());
        Thread.sleep(2000);             // 等待两秒,因为需要时间才能生效

        client.delete().forPath(pathNew);
    }

}

结果:

test
test2
触发事件
    
触发了永久监听的回调,当前值为:test2
触发了永久监听的回调,当前值为:test2
触发了永久监听的回调,当前值为:test2

声明:Jerry's Blog|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - 35.1 Zookeeper


Follow excellence, and success will chase you.