一、本周介绍
1 Zookeeper
2 Dubbo
二、本章介绍
2.1 主要内容
三、初识Zookeeper
3.1 Zookeeper:为分布式应用程序提供分布式协调服务的框架
1.分布式的协调框架
源自雅虎研究院,不做软件业务,偏向研究软件基础。
需求:希望有这样一个系统,通过这个系统能搜索到一些数据。比如想查询某个数据库的地址。
当前问题:一个单机系统,就能对外提供服务。但单点,不可靠。比如,机器、网络、电力等不可控因素太多。
很简单:写一个Spring Boot项目,里面开一个接口,写一个Hash Map对外提供服务。你给我一个key,我就返回对应的value。这么做,就是单点。
2.分布式应用程序的分布式协调服务
简单理解为:一个比较可靠的、能够对外提供一致服务的数据库。
相比于普通数据库,它最大的不同是:分布式部署,多节点并存。所以:
- 就变得高可用,不会因为某一个节点宕机而不可用
- 多节点对外发布消息都是一致的
3.2 特点
3.3 架构(图)
- Leader:能够协调不同的server进行工作。处理写相关的请求,并同步到其他的跟随者上去。如果Leader意外宕机,会重新选举一个leader。
- 不同的客户端client,连接到不同的服务器server,不会把所有的压力都给同一个服务器。
Zookeeper集群中server数量总是确定的,所以集群中的server交互采用比较可靠的bio长连接模型;
不同于集群中sever间交互,zookeeper客户端其实数量是未知的,为了提高zookeeper并发性能,zookeeper客户端与服务器端交互采用nio模型。
3.4 典型的应用场景:4个
四、Zookeeper的安装、配置
主要是Linux系统。
4.1 Linux下的安装
1.下载安装包
网址:https://downloads.apache.org/zookeeper/
借鉴老师提供的《Zookeeper的安装、配置教辅》
到电脑的cmd中:
登录阿里云的远程服务器的Linux系统:
ssh root@121.199.31.220
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
2.解压缩:
tar zxvf apache-zookeeper-3.6.4-bin.tar.gz
3.
复制一个配置文件:
cp conf/zoo_sample.cfg conf/zoo.cfg
进入该配置文件进行观看:不需要修改:
vim zoo.cfg
启动:./bin/zkServer.sh start
关闭:./bin/zkServer.sh stop
4.2 Windows下安装
前提:Windows电脑上要有wget命令才行,如果没有参考以下文章进行简单的安装:https://blog.51cto.com/huny/3265480
。
1.
wget https://downloads.apache.org/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
2.
复制配置文件
(因为Windows下的一些cmd指令,跟Linux下不一样,比如这里就必须用Windows的copy、copyx等指令,有一定的熟悉成本,还不如手动复制几秒搞定 )
3.启动
4.3 Mac OS下安装
一个brew就能搞定
五、节点znode(基本数据类型)
5.1 树结构
多叉树;
圆圈就是一个一个的节点,数据就存储在里面;
5.2 节点的特点:4个
5.3 节点的类型:4个
六、操作节点的常用命令
6.1 基础命令:增删改查
1.前提准备
Linux环境下:
注意:
要想运行Zookeeper的命令,得需要Java环境。所以,Linux系统中,如果没有安装,就要安装JDK1.8。
- 执行命令
yum -y list java*
查看可安装java版本- 执行命令
yum install -y java-1.8.0-openjdk-devel.x86_64
, 耐心等待至自动安装完成- 输入
java -version
查看已安装的jdk版本,
启动Zookeeper服务:./bin/zkServer.sh start
关闭Zookeeper服务:./bin/zkServer.sh stop
连接:./bin/zkCli.sh -server 127.0.0.1:2181
连接成功;
以上两者,是什么区别?
zookeeper不仅提供了服务端命令,而且提供了客户端命令
zkServer ------------
zkCli ------------------
见3.3 。
2.
通过指令help
来查看各种命令:
3.create、get
create /java1 hellojava1
create /java1/zk1 hellozk1
ls /java1
------------------------------------- 查看该节点下的,有哪些子节点
get /java1
------------------------------------ 查看该节点存储的数据是什么
ls -R /
---------------- 以递归的形式,把根目录下的所有节点,显示出来
4.set、delete
以上,就是启动了服务器、启动了客户端连接上去之后,对于节点进行了增删改查相关的基础操作。
6.2 高级命令:版本号、顺序节点
1.
stat /java1
--------------------- 查看该节点的属性状态:
注意下面两个属性的区别:
2.用版本号,对节点数据进行赋值、删除
dataVersion
表示该节点的数据的变化次数
假设有两个客户端,都要对这个节点的数值进行修改:
版本号与set、delete也是一样的
3.临时节点、顺序节点
以上,全都是持久节点,即没有使用到顺序节点,也没有使用到临时节点。
-e
:表示临时节点:
-s
:表示顺序节点--------------同一个父节点下,所有节点的顺序都是自增的,不重复
-s
+-e
以上,就是Zookeeper高级命令的讲解。
七、Watcher机制
7.1 作用
Watcher:监视器、监听器。
叫什么不重要,重要的是它有什么作用:
如果某一个节点的数据发生变化,就立马通知其他所有节点。后者会根据通知,同步更新。
7.2 事件类型
常见一部分:
八、用ACL实现细粒度的权限控制
8.1 权限:CRWDA
5种权限:CRWDA
admin:是最高级别的权限
8.2 权限方案
实际开发,用的不多,了解即可,运维会用的较多。
九、使用Java代码操作Zookeeper
9.1 通过ZK原生的客户端,连接ZK,并节点操作
1.连接ZK
新建maven项目:
引入依赖:
<dependencies>
<!--1. 引入zookeeper依赖-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>'
</dependencies>
在常量类ZkConstant
中:
/**
* ZK常量类
*/
public class ZkConstant {
public static final String ZK_HOST = "127.0.0.1:2181"; // 如果是远端云服务器,就把IP改下,同时安全组里把2181端口打开
public static final Integer CONNECT_TIMEOUT = 3000; // 3000ms
}
在类ConnectWatcher
中:
/**
* 连接Watcher
*/
public class ConnectWatcher implements Watcher {
@Override
public void process(WatchedEvent event) { // 被监听到了的事件
System.out.println("ConnectWatcher类的process方法被调用了");
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
if (event.getState() == Event.KeeperState.Closed) {
System.out.println("连接关闭");
}
}
}
在类FirstConnect
中:
/**
* 连接到ZK
*/
public class FirstConnect {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectWatcher connectWatcher = new ConnectWatcher();
// 前两个参数容易,是两个常量 // 第三个参数是监听者
ZooKeeper zk = new ZooKeeper(ZkConstant.ZK_HOST, ZkConstant.CONNECT_TIMEOUT, connectWatcher);
System.out.println("客户端开始连接ZK服务器");
States state = zk.getState();
System.out.println(state);
Thread.sleep(2000); // 因为连接是需要时间的,所以模拟下。
state = zk.getState();
System.out.println(state); // 2秒后,通常是已经连接成功了
Thread.sleep(3000); // 模拟连接成功之后,进行的一系列的操作
zk.close();
}
}
前提:要先在自己电脑windows上,开启Zookeeper的服务:
结果:
客户端开始连接ZK服务器
CONNECTING
ConnectWatcher类的process方法被调用了:
连接成功
CONNECTED
ConnectWatcher类的process方法被调用了:
连接关闭
2.节点操作
在常量类ZkConstant
中:
/**
* ZK常量类
*/
public class ZkConstant {
public static final String ZK_HOST = "127.0.0.1:2181"; // 如果是远端云服务器,就把IP改下,同时安全组里把2181端口打开
public static final Integer CONNECT_TIMEOUT = 3000; // 3000ms
public static final String PATH1 = "/imooc-my-first-node"; // 节点1
}
在类ZKCRUD
中:
/**
* 对于节点的增删改查
*/
public class ZKCRUD {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ConnectWatcher connectWatcher = new ConnectWatcher();
// 前两个参数容易,是两个常量 // 第三个参数是监听者
ZooKeeper zk = new ZooKeeper(ZkConstant.ZK_HOST, ZkConstant.CONNECT_TIMEOUT, connectWatcher);
System.out.println("客户端开始连接ZK服务器");
States state = zk.getState();
System.out.println(state);
Thread.sleep(2000); // 因为连接是需要时间的,所以模拟下。
state = zk.getState();
System.out.println(state); // 2秒后,通常是已经连接成功了
Thread.sleep(3000); // 模拟连接成功之后,进行的一系列的操作
// 对节点进行增删改查
zk.create(ZkConstant.PATH1, "imooc1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);// 跟命令的叫法是一样的,只不过参数一样
Thread.sleep(2000);
byte[] data = null;
data = zk.getData(ZkConstant.PATH1, null, null); // 获取节点的数值
System.out.println(new String(data));
Thread.sleep(2000);
zk.setData(ZkConstant.PATH1, "imooc2".getBytes(), -1); // -1 表示不需要指定版本
Thread.sleep(2000);
data = zk.getData(ZkConstant.PATH1, null, null); // 获取节点的数值
System.out.println(new String(data));
zk.delete(ZkConstant.PATH1, -1);
zk.close();
}
}
其中:
结果:
客户端开始连接ZK服务器
CONNECTING
ConnectWatcher类的process方法被调用了:
连接成功
CONNECTED
imooc1
imooc2
ConnectWatcher类的process方法被调用了:
连接关闭
9.2 处理Watcher事件
1.
如果没有加上监听器的情况:
2.
如果加上了监听器的情况:
注意监听器只能监听1次
9.3 通过Apache Curator客户端,来操作ZK(推荐)
1.原生的Java的客户端的缺点
- 3.还不支持对Watcher的永久监听
2.Apache Curator客户端的优点
(1)非永久监听
引入依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc</groupId>
<artifactId>zk-pratice</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!--1. 引入zookeeper依赖,用于作为服务的注册与发现-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<!--2. 引入curator的两个依赖,,通过Java代码操作ZK-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
</project>
/**
* 用Curator来操作ZK
*/
public class CuratorTests {
public static void main(String[] args) throws Exception {
String path = "/curator1";
String connectString = "127.0.0.1:2181"; // 指定路径
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10); // 指定重试机制
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry); // 创建客户端
// 启动客户端
client.start();
client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event)->{
switch (event.getType()){
case WATCHED:
WatchedEvent watchedEvent = event.getWatchedEvent();
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
System.out.println(new String(c.getData().forPath(path))); // 拿到该节点的内容,byte数组,转换为字符串
System.out.println("触发事件");
}
}
});
String data = "test";
String data2 = "test2";
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes()); // 轻松的创建完一个节点
byte[] bytes = client.getData().watched().forPath(path); // 拿到该节点的值
System.out.println(new String(bytes));
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.delete().forPath(path); // 删除节点
Thread.sleep(2000);
}
}
结果:
能够成功的用Java代码操作ZK节点,并能成功监听;
但是,第二次、第三次的节点修改,并没有触发监听,说明这是个一次性的watch:
test
test2
触发事件
(2)永久监听
/**
* 用Curator来操作ZK
*/
public class CuratorTests {
public static void main(String[] args) throws Exception {
String path = "/curator1";
String connectString = "127.0.0.1:2181"; // 指定路径
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10); // 指定重试机制
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry); // 创建客户端
// 启动客户端
client.start();
client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event)->{
switch (event.getType()){
case WATCHED:
WatchedEvent watchedEvent = event.getWatchedEvent();
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
System.out.println(new String(c.getData().forPath(path))); // 拿到该节点的内容,byte数组,转换为字符串
System.out.println("触发事件");
}
}
});
String data = "test";
String data2 = "test2";
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes()); // 轻松的创建完一个节点
byte[] bytes = client.getData().watched().forPath(path); // 拿到该节点的值
System.out.println(new String(bytes));
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.setData().forPath(path, data2.getBytes()); // 修改节点
client.delete().forPath(path); // 删除节点
Thread.sleep(2000);
// 永久监听
String pathNew = "/curatorNew";
client.create().withMode(CreateMode.EPHEMERAL).forPath(pathNew, data.getBytes()); // 创建新节点
NodeCache nodeCache = new NodeCache(client, pathNew);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData currentData = nodeCache.getCurrentData();
if (currentData != null) {
System.out.println("触发了永久监听的回调,当前值为:"+new String(currentData.getData()));
}
}
});
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000); // 等待两秒,因为需要时间才能生效
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000); // 等待两秒,因为需要时间才能生效
client.setData().forPath(pathNew, data2.getBytes());
Thread.sleep(2000); // 等待两秒,因为需要时间才能生效
client.delete().forPath(pathNew);
}
}
结果:
test
test2
触发事件
触发了永久监听的回调,当前值为:test2
触发了永久监听的回调,当前值为:test2
触发了永久监听的回调,当前值为:test2
Comments | NOTHING