1.介绍
1.1概念
zookeeper作用:用于维护配置信息、命名、提供分布式同步和提供组服务
zookeeper主要是文件系统和通知机制
文件系统主要是用来存储数据
通知机制主要是服务器或者客户端进行通知,并且监督
基于观察者模式设计的分布式服务管理框架,开源的分布式框架
1.2特点
1):一个leader,多个follower的集群
2):集群只要有半数以上包括半数就可正常服务,一般安装奇数台服务器
3):全局数据一致,每个服务器都保存同样的数据,实时更新
4):更新的请求顺序保持顺序(来自同一个服务器)
5):数据更新的原子性,数据要么成功要么失败(大事务)
6):数据实时更新性很快(因为数据量很小)
1.3主要的集群步骤
1):服务端启动时去注册信息(创建都是临时节点)
2):获取到当前在线服务器列表,并且注册监听
3):服务器节点下线
4):服务器节点上下线事件通知
5):process(){重新再去获取服务器列表,并注册监听}
1.4数据结构
与 Unix 文件系统很类似,可看成树形结构,每个节点称做一个ZNode。每一个ZNode默认能够存储 1MB 的数据。也就是只能存储小数据(一般配置信息,注册信息等)
1.5应用场景
1):统一命名服务(域名服务)
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,eg:ip不容易记住,而域名容易记住
2):统一配置管理(一个集群中的所有配置都一致,且也要实时更新同步)
3):将配置信息写入ZooKeeper上的一个Znode,各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器
4):统一集群管理(掌握实时状态)
5):将节点信息写入ZooKeeper上的一个ZNode。监听ZNode获取实时状态变化
6):服务器节点动态上下线
7):软负载均衡(根据每个节点的访问数,让访问数最少的服务器处理最新的数据需求)
2.本地安装
2.1安装jdk
1.查看是否已安装JDK
yum list installed |grep java
2.卸载CentOS系统Java环境
# yum -y remove java-1.8.0-openjdk* *表示卸载所有openjdk相关文件输入
# yum -y remove tzdata-java.noarch 卸载tzdata-java
3.查看JDK软件包版本
# yum -y list java* 或者使用# yum searchjava | grep -i --color JDK
4.安装
yum install java-1.8.0-openjdk* //安装java1.8.0所有程序
java -version //查看java版本信息
注意:使用yum安装环境变量自动就配好了
2.2下载安装
官网:https://zookeeper.apache.org/
下载3.5.7稳定版本
下载:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg
bin目录 框架启动停止,客户端和服务端的
conf 配置文件信息
docs文档
lib 配置文档的依赖
2.3配置文件修改
配置文件:/opt/module/zookeeper-3.5.7/conf/zoo.cfg
修改:
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径
2.4启动服务端
cd /opt/module/zookeeper-3.5.7/bin
[root@sg-15 bin]# ./zkServer.sh start // 启动服务端
//查看进程
[root@sg-15 bin]# jps -l
21172 sun.tools.jps.Jps
21110 org.apache.zookeeper.server.quorum.QuorumPeerMain
2.5启动客户端
[root@sg-15 bin]# ./zkCli.sh // 启动客户端
[zk: localhost:2181(CONNECTED) 4] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 5] quit //退出客户端
[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: standalone
2.6zookeeper常用命令
[root@sg-15 bin]# ./zkServer.sh start // 启动服务端
[root@sg-15 bin]# ./zkServer.sh stop // 停止服务端
[root@sg-15 bin]# jps -l // 查看进程
[root@sg-15 bin]# ./zkCli.sh // 启动客户端
[zk: localhost:2181(CONNECTED) 5] quit // 退出客户端
[root@sg-15 bin]# ./zkServer.sh status // 查看zookeeper状态
2.7配置文件解读
配置文件的5大参数:
tickTime = 2000 //通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit = 10 //Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
syncLimit = 5 //Leader和Follower之间通信时间如果超过5个心跳时间,Leader认为Follwer死掉,从服务器列表中删除Follwer。
dataDir =/tmp/zookeeper //保存zookeeper的数据,这是默认值,会定时被系统清除
dataDir保存zookeeper的数据,默认是temp会被系统定期清除,通常改为自己的路径
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径
clientPort = 2181 //客户端的连接端口,一般不需要修改
3.集群安装
3.1集群规划
sg15,sg16,sg17三台机器部署Zookeeper
3.2安装
解压:tar -xf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
改名:mv apache-zookeeper-3.5.7-bin zookeeper-3.5.7
配置文件改名:mv zoo_sample.cfg zoo.cfg
3.3配置
[root@sg-15 zookeeper-3.5.7]# mkdir zkData // 创建目录zkData
[root@sg-15 zkData]# vi myid // 创建一个 myid 的文件
在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)
5 // 192.168.0.215,这里设置服务器尾号,其他的不重复就行
[root@sg-15 conf]# mv zoo_sample.cfg zoo.cfg // 配置文件改名
[root@sg-15 conf]# vi zoo.cfg //修改配置文件
dataDir = /opt/module/zookeeper-3.5.7/zkData // 通常修改的路径
// 添加配置
#######################cluster##########################
server.5=192.168.0.215:2888:3888
server.6=192.168.0.216:2888:3888
server.7=192.168.0.217:2888:3888
##配置参数解读
server.A=B:C:D
A 是一个数字,表示这个是第几号服务器,就是myid中的值
B 是这个服务器的地址
C 是这个服务器Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的
Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
注意:一台机器弄好之后,打包发送到其他机器。其他机器修改myid值
tar -cf module.tar ./module // 打包
scp -r module.tar [email protected]:/opt/ //发送
tar -xf module.tar // 解包
3.4启动zookeeper集群
注意:集群只要有半数以上包括半数就可正常服务。
三台机器启动:
[root@sg-15 bin]# ./zkServer.sh start //启动
[root@sg-15 bin]# ./zkServer.sh restart //重启
[root@sg-17 bin]# ./zkServer.sh status //检查状态
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader //本机器为leader
[root@sg-15 bin]# ./zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower //本机器为follower
4.选举机制
选举谁当leader
介绍:
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻, 集群中的每台机器的ZXID值不一定完全一样
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是
4.1触发选举时机
1.服务器刚启动
2.服务器运行期间无法和leader保持连接
当一台机器进入leader选举流程时,当前集群出现两种状态:
1.集群中已经存在一个leader(此机器和leader建立连接,并状态同步)
2.集群中不存在leader(触发选举),looking状态
4.2zookeeper选举机制---第一次启动
服务器1:myid=1
服务器2:myid=2
服务器3:myid=3
(1) 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2) 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:服务器1和服务器2比较谁的myid大,更改选票为推举服务器myid大的。此时服务器1票数0票,服务器2票数2票,大于半数以上结果,选举完成。服务器1为follower,2状态为leader
(3) 服务器3启动,发起一次选举。此时服务器1,2已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为1票,此时服务器3服从多数,更改选票信息为服务器2,并更改状态为FOLLOWING;
4.2zookeeper选举机制---非第一次启动
服务器1:myid=1
服务器2:myid=2
服务器3:myid=3
服务器运行期间无法和leader保持连接触发重新选举:
假设zookeeper由5台服务器组成,SID分别为1,2,3,4,5,ZXID分别为8,8,8,7,7,并且此时SID为3的服务器此时时leader。某时刻3和5出现故障,因此开始进行leader选举:
SID为1的机器:(1,8,1)
SID为2的机器:(1,8,2)
SID为4的机器:(1,7,4)
选举规则:
优先比较SID,再比较ZXID,其次比较Epoch。大的直接胜出当选leader
4.3选举机制总结
半数机制,超过半数的投票通过,即通过。
第一次启动选举规则:投票过半数时,服务器 myid 大的胜出当leader
第二次启动选举规则:①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,任期代号id大的胜出
5.客户端命令行操作
5.1常用命令整合
登陆客户端操作:很多命令和linux命令相似,比如ls,history等
jps : 查看zookeeper运行的进程
help :显示所有操作命令
ls / :查看当前znode中包含的内容
ls -s / :查看当前节点详细数据
create : 创建普通节点
create -s :创建带序号节点
create -e :创建临时普通节点
create -e -s :创建临时有序节点
delete:删除节点
create /wangzhe "this is wangzhe" //新建节点(永久节点,不带序号)
create /wangzhe/fashi "this is fashi" // //新建节点(永久节点,不带序号)
get /wangzhe // this is wangzhe 取值
get -s /wangzhe //节点详情
set set /wangzhe "this is wangzherongyao" //修改值
get -w :监听值
ls -w /wangzhe :监听数量
监听注册一个生效一次
5.2启动客户端
[root@sg-15 bin]# ./zkCli.sh -server 192.168.0.215:2181
quit //退出
5.3znode节点数据信息
命令:ls -s /
[zk: 192.168.0.215:2181(CONNECTED) 3] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
##################################
1):czxid:创建节点的事务id zxid
每次修改ZooKeeper 状态都会产生一个ZooKeeper 事务 ID。事务 ID 是ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么zxid1 在 zxid2 之前发生。
2):ctime:znode 被创建的毫秒数(从 1970 年开始)
3):mzxid:znode 最后更新的事务zxid
4):mtime:znode 最后更新的毫秒数(从 1970 年开始)
5):pZxid:znode 最后更新的子节点zxid
6):cversion:znode 子节点变化号,znode 子节点修改次数
7):dataversion:znode 数据变化号
8):aclVersion:znode 访问控制列表的变化号
9):ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
10):dataLength:znode 的数据长度
11):numChildren:znode 子节点数量
5.4节点类型(持久/短暂/有序号/无序号)
持久:客户端和服务端端开连接后,创建的节点不删除
序号:在分布式系统中,顺序号可以被用于所有事件排序,这样客户端可以通过顺序号推断事件的顺序
{
持久有序号:客户端和服务端端开连接后,创建的节点不删除。且节点名称顺序编号
持久无序号:客户端和服务端端开连接后,创建的节点不删除。无序号
}
短暂:客户端和服务端端开连接后,创建的节点自己删除
{
短暂有序号:客户端和服务端端开连接后,创建的节点自己删除。且节点名称顺序编号
短暂无序号:客户端和服务端端开连接后,创建的节点自己删除。无序号
}
5.5节点操作
5.5.1创建/删除节点
create : 创建普通节点
create -s :创建带序号节点
create -e :创建临时普通节点
create -e -s :创建临时有序节点
delete:删除一个节点(如果这个节点下有子节点,删除失败)
deleteall:递归删除所有节点(如果这个节点下有子节点,全部删除)
eg:delete /wangzhe/fashi //只删除fashi节点
deleteall /wangzhe //递归删除wangzhe所有节点
//1.创建节点:普通永久节点
[zk: 192.168.0.215:2181(CONNECTED) 27] create /wangzhe "this is wangzhe"
Created /wangzhe
[zk: 192.168.0.215:2181(CONNECTED) 40] create /wangzhe/fashi "this is fashi"
Created /wangzhe/fashi
//2.创建节点:有序永久节点
[zk: 192.168.0.215:2181(CONNECTED) 66] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000000
[zk: 192.168.0.215:2181(CONNECTED) 70] create -s /wangzhe/fashi/daji "daji"
Created /wangzhe/fashi/daji0000000001 //再次创建daji,序号+1
//3.创建节点:普通临时节点
[zk: 192.168.0.215:2181(CONNECTED) 71] create -e /wangzhe/fashi/ganjiang "ganjiang"
Created /wangzhe/fashi/ganjiang
//4.创建节点:有序临时节点
[zk: 192.168.0.215:2181(CONNECTED) 73] create -e -s /wangzhe/fashi/anqila "anqila"
Created /wangzhe/fashi/anqila0000000003
//查看
[zk: 192.168.0.215:2181(CONNECTED) 74] ls /wangzhe/fashi
[anqila0000000003, daji0000000000, daji0000000001, ganjiang]
//delete删除
[zk: 192.168.0.215:2181(CONNECTED) 19] delete /wangzhe/fashi/xiaoqiao
xiaoqiao xiaoqiao0000000007
5.5.2获取/查看的值
注意:get获取有序节点的值时:必须获取最后一个序号的值,比如获取下面ganjiang0000000004报错,获取ganjiang0000000005正常。
ls
get
get -s
//查看
[zk: 192.168.0.215:2181(CONNECTED) 10] ls /wangzhe/fashi
[daji0000000000, daji0000000001, ganjiang0000000004, ganjiang0000000005, xiaoqiao]
//获取值
[zk: 192.168.0.215:2181(CONNECTED) 12] get /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao
// 获取详细
[zk: 192.168.0.215:2181(CONNECTED) 13] get -s /wangzhe/fashi/xiaoqiao0000000007
xiaoqiao
cZxid = 0x30000002f
ctime = Sat Mar 26 18:37:35 CST 2022
mZxid = 0x30000002f
mtime = Sat Mar 26 18:37:35 CST 2022
pZxid = 0x30000002f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: 192.168.0.215:2181(CONNECTED) 14] get -s /wangzhe/fashi
this is fashi
cZxid = 0x300000020
ctime = Sat Mar 26 17:31:12 CST 2022
mZxid = 0x300000020
mtime = Sat Mar 26 17:31:12 CST 2022
pZxid = 0x30000002f
cversion = 10
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 6
5.6节点监听
get -w :监听值
ls -w /wangzhe :监听数量
监听注册一个生效一次
5.6.1监听节点值改变
[zk: 192.168.0.215:2181(CONNECTED) 25] get -w /wangzhe //开启一次监听
this is wangzhe
[zk: 192.168.0.215:2181(CONNECTED) 26] set /wangzhe "jianting:this is wangzhe" //修改值
WATCHER:: //监听值发生变化
WatchedEvent state:SyncConnected type:NodeDataChanged path:/wangzhe
5.6.2监听节点下子节点数量改变
[zk: 192.168.0.215:2181(CONNECTED) 27] ls -w /wangzhe ////开启一次监听
[fashi, fashi0000000001]
[zk: 192.168.0.215:2181(CONNECTED) 28] delete /wangzhe/fashi0000000001 //删除一个节点
WATCHER:: //监听数量发生变化
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/wangzhe
6.goang操作zookeeper
6.1创建节点create(增)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
// 任意一个ip都可以,但是建议放主节点ip
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
// 1.创建的普通永久节点
path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
}
fmt.Println("创建的普通永久节点:", path)
// 2.创建的普通临时节点,创建此节点的会话结束后立即清除此节点
ephemeral, err := conn.Create("/ephemeral", []byte("world"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
}
fmt.Println("创建的普通临时节点:", ephemeral)
// 3.创建的有序永久节点
sequence, err := conn.Create("/sequence", []byte("world"), zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
fmt.Println("创建的有序永久节点:", sequence)
// 4.创建的有序临时节点,创建此节点的会话结束后立即清除此节点
ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("world"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
fmt.Println("创建的有序临时节点:", ephemeralSequence)
}
6.2查看节点get(查)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
result, state, err := conn.Get("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("result: ", string(result)) // this is fashi
fmt.Printf("%#v",state)
//状态结果:&zk.Stat{Czxid:12884901920, Mzxid:12884901920, Ctime:1648287072539, Mtime:1648287072539, Version:0, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:13, NumChildren:5, Pzxid:12884901936}
}
6.3修改节点set(改)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/wangzhe/fashi"
_, state, _ := conn.Get(path) // 先查询,拿到当前版本
state, err = conn.Set(path, []byte("hello"), state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Printf("%#v",state)
//结果:&zk.Stat{Czxid:12884901920, Mzxid:12884902012, Ctime:1648287072539, Mtime:1648305221598, Version:1, Cversion:11, Aversion:0, EphemeralOwner:0, DataLength:5, NumChildren:5, Pzxid:12884901936}2022/03/26 22:33:39 recv loop
}
6.4删除节点delete(删)
注意:此方法不能递归删除节点
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/hello"
exists, state, err := conn.Exists(path) //判断是否存在和查询版本
if exists{
err = conn.Delete(path, state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("节点删除成功!!!")
}else {
fmt.Println("节点不存在!!!")
}
}
6.5查看节点的子节点Children
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
childrenList, state, err := conn.Children("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println(childrenList)
fmt.Printf("%#v",state)
}
6.6遍历节点Children后再get
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
childrenList, _, err := conn.Children("/wangzhe/fashi")
if err != nil {
fmt.Println("err:",err)
return
}
for _,children:=range childrenList{
childrenPath:= fmt.Sprintf("/wangzhe/fashi/%s",children)
result, state, err := conn.Get(childrenPath)
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println(string(result))
fmt.Println(state)
}
}
6.7判断节点是否存在-conn.Exists
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
path := "/hello"
exists, _, err := conn.Exists(path)
if err != nil {
fmt.Println("err:",err)
return
}
if exists{
fmt.Println("节点存在")
}else{
fmt.Println("节点不存在")
}
}
7.监听/watch
7.1监听节点-全局监听
将监听器放到Connect
函数中,如果有监听事件发生,会一直执行监听器的回调函数。监听执行了一次之后要重新注册监听。
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func callback(e zk.Event) {
fmt.Println("========================")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
}
func main() {
eventCallbackOption := zk.WithEventCallback(callback)
// 经过测试,在连接时会执行3次回调函数
conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second,eventCallbackOption)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
// 注册一个 watch
exists, state, _, err := conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("exists:",exists)
if !exists {
// 创建 /watch 时,触发监听事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
// 再注册一个 watch
_, state, _, err = conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
}
// 删除 /watch 时,触发监听事件,watch 失效
err = conn.Delete("/watch", state.Version)
if err != nil {
fmt.Println("err:",err)
return
}
}
结果:
========================
path:
type: EventSession
state: StateConnecting
========================
path:
type: EventSession
state: StateConnected
2022/03/27 11:09:27 connected to 192.168.0.215:2181
========================
path:
type: EventSession
state: StateHasSession
2022/03/27 11:09:27 authenticated: id=360292329056632906, timeout=4000
2022/03/27 11:09:27 re-submitting `0` credentials after reconnect
exists: false
========================
path: /watch
type: EventNodeCreated
state: Unknown
========================
path: /watch
type: EventNodeDeleted
state: Unknown
7.2监听部分事件
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215","192.168.0.216","192.168.0.217"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
// 注册一个 watch
exists, _, eventChannel, err := conn.ExistsW("/watch")
if err != nil {
fmt.Println("err:",err)
return
}
go func() {
// 从事件 channel 中取出事件
e := <-eventChannel
fmt.Println("========================")
fmt.Println("path:", e.Path)
fmt.Println("type:", e.Type.String())
fmt.Println("state:", e.State.String())
}()
if !exists {
// 创建 临时节点/watch 时,触发监听事件,watch 失效
_, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
}
}
8.微服务动态上下线监听(服务注册/发现)
8.1需求实现
微服务分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求实现:
服务端:服务端启动时,在zookeeper中创建临时有序节点,服务关闭时,临时节点自动删除了(zookeeper临时节点机制)
客户端:监听节点的变化
8.2服务端创建代码-(注册服务)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
//创建的有序临时节点,创建此节点的会话结束后立即清除此节点 create -e -s
ephemeralSequence, err := conn.Create("/servers/bikesvc", []byte("bikesvc"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
fmt.Println("err:",err)
return
}
fmt.Println("创建的有序临时节点:", ephemeralSequence)
time.Sleep(time.Second*10)
}
8.3客户端监听代码-(服务发现)
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:", err)
return
}
defer conn.Close()
snapshots, errors := mirror(conn, "/servers") //监控的根节点,根节点不能删除
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Println("监控变化:", snapshot)
case err := <-errors:
fmt.Println("err:", err)
}
}
}()
for {
}
}
结果:
服务端:
创建的有序临时节点: /servers/bikesvc0000000010
客户端:
监控变化: []
监控变化: [bikesvc0000000009]
监控变化: []
9.分布式锁
加锁进行资源保护
go-zookeeper 添加分布式锁的方法为NewLock(c *Conn, path string, acl []ACL)。
锁的结构体为:
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
这个结构体实现了三个方法:Lock(),LockWithData(data []byte)和Unlock()
9.1分布式锁案例
根节点“/root”判断是否存在,不存在则创建
package main
import (
"fmt"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {
panic(err)
}
fmt.Println("第", n, "个 goroutine 获取到了锁")
time.Sleep(time.Second*1) // 1 秒后释放锁
lock.Unlock() //解锁
}(i)
}
wg.Wait()
}
这里给了两个进程抢锁,ls查看一下锁:
解释:把所有进程按有序排列,当成节点放入lock节点中,按照最小的序号执行。解锁一个删除一个。直到节点为空,进程执行完毕。
[zk: localhost:2181(CONNECTED) 32] ls /root/lock
[_c_1dbbc1ec75b285ef10a6d6154627335c-lock-0000000153, _c_793a837ded040d01608395e5eac65979-lock-0000000152]
先执行152,再执行153
9.2监控锁案例
监控锁节点变化
监控代码
package main
import (
"fmt"
"time"
"github.com/go-zookeeper/zk"
)
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181", "192.168.0.216:2181", "192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:", err)
return
}
defer conn.Close()
snapshots, errors := mirror(conn, "/root/lock") //监控的根节点,根节点不能删除
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Println("监控变化:", snapshot)
case err := <-errors:
fmt.Println("err:", err)
}
}
}()
for {
}
}
分布式锁代码
package main
import (
"fmt"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
func main() {
conn, _, err := zk.Connect([]string{"192.168.0.215:2181","192.168.0.216:2181","192.168.0.217:2181"}, time.Second)
if err != nil {
fmt.Println("err:",err)
return
}
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll)) //加锁
err = lock.LockWithData([]byte("it is a lock"))
if err != nil {
panic(err)
}
fmt.Println("第", n, "个 goroutine 获取到了锁")
time.Sleep(time.Second*1) // 1 秒后释放锁
lock.Unlock() //解锁
}(i)
}
wg.Wait()
}
结果: