1、前言
又到了金三银四的时候,大家都按耐不住内心的躁动,我在这里给大家分享下之前面试中遇到的一个知识点(zookeeper应用场景),希望对大家有些帮助。如有不足,欢迎大佬们指点指点。
2、zookeeper简介
3、zookeeper应用场景
下面的代码都需要一个序列化类,所以放在最前面声明
/**
* @author admin
*/
public class MyZkSerializer implements ZkSerializer {
String charset = "UTF-8";
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
@Override
public byte[] serialize(Object obj) throws ZkMarshallingError {
try {
return String.valueOf(obj).getBytes(charset);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
}
3.1 配置中心
3.1.1 什么是配置中心呢?
3.1.2 zookeeper怎么实现配置中心呢?
必要条件
实现方式
- 一个配置项对应一个zNode
// 1 将单个配置放到zookeeper上
public void putZk() {
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
String configPath = "/config1";
String value = "1111111";
if (client.exists(configPath)) {
client.writeData(configPath, value);
} else {
client.createPersistent(configPath, value);
}
client.close();
}
// 需要配置的服务都从zk上取,并注册watch来实时获得配置更新
public void getConfigFromZk() {
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
String configPath = "/config1";
String value = client.readData(configPath);
System.out.println("从zk读到配置config1的值为:" + value);
// 监控配置的更新,基于watch实现发布订阅功能
client.subscribeDataChanges(configPath, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// TODO 配置删除业务处理
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("获得更新的配置值:" + data);
}
});
// 这里只是为演示实时获取到配置值更新而加的等待。实际项目应用中根据具体场景写(可用阻塞方式)
try {
Thread.sleep(5 * 60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 一个配置文件对应一个zNode
// 将配置文件的内容存放到zk节点上
public void putConfigFile2ZK() throws IOException {
File f = new File(this.getClass().getResource("/config.xml").getFile());
FileInputStream fin = new FileInputStream(f);
byte[] datas = new byte[(int) f.length()];
fin.read(datas);
fin.close();
ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new BytesPushThroughSerializer());
String configPath = "/config2";
if (client.exists(configPath)) {
client.writeData(configPath, datas);
} else {
client.createPersistent(configPath, datas);
}
client.close();
}
获取整个配置文件的方式跟步骤1类似,只不过需要解析对应的配置文件而已。
3.2 命名服务(注册中心)
3.2.1 什么是注册中心?
3.2.2 zookeeper怎么实现注册中心呢?
3.3 Master选举
3.3.1 什么是Master选举?
3.3.2 zookeeper怎么实现Master选举呢?
下面咱们通过代码来模拟一下master选举
/**
* @author yinfeng
*/
public class Server {
private final String cluster;
private final String name;
private final String address;
private final String path, value;
private String master;
public Server(String cluster, String name, String address) {
super();
this.cluster = cluster;
this.name = name;
this.address = address;
path = "/" + this.cluster + "Master";
value = "name:" + name + " address:" + address;
final ZkClient client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
final Thread thread = new Thread(() -> {
electionMaster(client);
});
thread.setDaemon(true);
thread.start();
}
/**
* 选举方法
**/
public void electionMaster(ZkClient client) {
try {
client.createEphemeral(path, value);
master = client.readData(path);
System.out.println(value + "创建节点成功,成为Master");
} catch (ZkNodeExistsException e) {
master = client.readData(path);
System.out.println("Master为:" + master);
}
// 为阻塞自己等待而用
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(path, listener);
// 让自己阻塞
if (client.exists(path)) {
try {
cdl.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(path, listener);
// 递归调自己(下一次选举)
electionMaster(client);
}
}
咱们通过启动多个服务来看看是否测试成功
public static void main(String[] args) {
// 测试时,依次开启多个Server实例java进程,然后停止获取的master的节点,看谁抢到Master
Server s = new Server("cluster1", "server1", "192.168.1.11:8991");
Server s1 = new Server("cluster1", "server2", "192.168.1.11:8992");
Server s2 = new Server("cluster1", "server3", "192.168.1.11:8993");
Server s3 = new Server("cluster1", "server4", "192.168.1.11:8994");
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
可以看到功能一切正常
3.4 分布式队列
3.4.1 什么是分布式队列?
3.4.2 zookeeper怎么实现分布式队列?
3.5 分布式锁
3.5.1 什么是分布式锁?
3.5.2 zookeeper通过临时节点实现布式锁?
竞争锁流程如下图:
代码实现如下
/**
* @author yinfeng
*/
public class ZKDistributeLock implements Lock {
private String lockPath;
private ZkClient client;
// 锁重入计数
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
}
@Override
public boolean tryLock() {
// 锁重入不会阻塞
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
// 创建节点
try {
client.createEphemeral(lockPath);
this.reentrantCount.set(1);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
// 重入释进行放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
client.delete(lockPath);
}
@Override
public void lock() {
// 如果获取不到锁,阻塞等待
if (!tryLock()) {
// 没获得锁,阻塞自己
waitForLock();
// 再次尝试
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到节点被删除了-------------");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞自己
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
client.unsubscribeDataChanges(lockPath, listener);
}
@Override
public void lockInterruptibly() {
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
咱们在写个测试类试一下效果,通过多线程来模拟多实例竞争锁
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
final CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁!");
}
}
).start();
}
}
3.5.3 zookeeper通过临时顺序节点实现布式锁?
原理图如下
流程图如下
接着咱们通过代码来实现吧
/**
* @author yinfeng
*/
public class ZKDistributeImproveLock implements Lock {
/**
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除自己创建的临时顺序节点
*/
private final String lockPath;
private final ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<>();
private ThreadLocal<String> beforePath = new ThreadLocal<>();
/**
* 锁重入计数
*/
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeImproveLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("192.168.10.11:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath);
} catch (ZkNodeExistsException ignored) {
}
}
}
@Override
public boolean tryLock() {
// 重入则直接返回获得锁成功
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
if (this.currentPath.get() == null) {
currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
}
// 获得所有的子节点
List<String> children = this.client.getChildren(lockPath);
// 排序list
Collections.sort(children);
// 判断当前节点是否是最小的
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
this.reentrantCount.set(1);
return true;
} else {
// 取到前一个
// 得到字节的索引号
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
beforePath.set(lockPath + "/" + children.get(curIndex - 1));
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
final CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 让自己阻塞
if (this.client.exists(this.beforePath.get())) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
// 重入的释放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
// 删除节点
this.client.delete(this.currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
最后咱们再来测试一下
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
final CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
try {
Thread.sleep(1000 * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " 释放锁!");
}
}).start();
}
}