10- 调度作业启动时的生命周期

10.1 启动监听器

监听机制的实现是一种发布者/订阅者的观察者设计模式的实现。调度系统引入来Netflix实现的Curator组件,通过Curator提供的订阅机制来实现对节点事件订阅。

Curator 事件订阅有两种模式:

  • 一种是标准的观察模式
  • 一种是缓存监听模式

标准的监听模式是使用Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。

Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到Zookeeper集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。接下来我们来看下在调度系统中有哪些常见的监听器,首先来看下startAllListeners方法的源码:

/**
* 开启所有监听器.
*/
public void startAllListeners() {
    electionListenerManager.start();
    shardingListenerManager.start();
    failoverListenerManager.start();
    monitorExecutionListenerManager.start();
    shutdownListenerManager.start();
    triggerListenerManager.start();
    rescheduleListenerManager.start();
    guaranteeListenerManager.start();
    jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}

startAllListeners方法通过调用监听器管理对象的start方法来启动各个监听器进行节点信息的订阅。

10.2 监听器大全

这么多的监听器有什么作用,又会在什么场景进行触发呢,下面我们就详细看一下每个监听管理器中的监听器的作用,触发条件,和具体业务。这个可以当作参考表格,后面用来详细讲解每一块功能的时候做为参考。

10.3 监听器使用

了解了监听器的作用,触发条件和具体业务事件,我们可以通过选主节点监听器来看下监听器是如何进行注册的。以ElectionListenerManager主节点监听管理器中的主节点选举监听器LeaderElectionJobListener为例,来看一下启动监听器的过程,在调用ElectionListenerManager的start方法时候会触发如下代码:

@Override
public void start() {
    addDataListener(new LeaderElectionJobListener());
    addDataListener(new LeaderAbdicationJobListener());
}

主要包含了两步:

  • 创建选主节点监听器
  • 添加数据监听器

(1)创建选主节点监听器

先来看下主节点选举监听器类型的必要实现

LeaderElectionJobListener类型与其他监听器一样通过继承AbstractJobListener作业监听器模版重写dataChanged模版方法来在节点发生变更的时候触发监听业务。而作业监听器模版通过实现TreeCacheListener接口,通过重写childEvent方法来实现节点的通知。

TreeCacheListener接口是Curator框架中用来进行对外监听通知的监听接口,主要包含childEvent方法在节点发生变化的时候,进行调用监听对象的childEvent方法,类型继承关系如下图所示

[Elastic-Job2.1.5源码]-10- 调度作业启动时的生命周期-LMLPHP

图6.3 调度系统选主监听UML

TreeCacheListener的childEvent一般在哪些场景会被触发呢,这个就需要看Curator包中的TreeCacheEvent.Type枚举类型,一般在添加节点,移除节点,节点数据变更,初始化连接,连接断开,连接重连,连接挂起时候被触发。

(2)添加数据监听器

了解了节点监听何时和如何被调用的我们还需要看一个节点如何才能注册成为订阅对象让Curator的监听事件触发。接下来我们就来看 addDataListener方法,在ElectionListenerManager类型中addDataListener方法主要通过调用jobNodeStorage方法的addDataListener如下代码所示:

protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
}

jobNodeStorage在第3章中我们提到过是用来封装对注册中心的逻辑操作,在注册中心对象针对节点的增删改查逻辑基础上进行二次封装,这里我们会看到jobNodeStorage类型中二次封装了添加数据监听的方法,逻辑如下所示:

/**
 * 注册数据监听器.
 * @param listener 数据监听器
 */
public void addDataListener(final TreeCacheListener listener) {
    TreeCache cache = (TreeCache) regCenter.getRawCache("/" + jobName);
    cache.getListenable().addListener(listener);
}
  • 获取TreeCache对象

  • 向TreeCache监听对象中添加监听器

TreeCache尝试将ZooKeeper路径的所有子级中的所有数据保留在本地。此类将监视Zookeeper路径,响应更新/创建/删除事件,下拉数据等。可以通过获取TreeCache的Listener容器来将自定义监听器注册到容器中,当发生更改时,该自定义监听器将得到通知。

整理下监听配置一共经历了几个步骤:

  • 编写监听器实现TreeCacheListener接口
  • 向TreeCache的监听器容器中添加监听器对象
  • 节点发生变更则触发监听器的childEvent方法
  • childEvent方法调用dataChanged方法
  • 在dataChanged方法中处理具体的业务逻辑

10.4 观察者设计模式

整理好了调度系统中使用Curator框架针对Zookeeper节点监听器的实现,为大家分享下监听器模式的底层设计模式思想,对于监听订阅,当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

举个观察者的案例,比如我们订阅报纸,我们可以订阅报纸,也可以取消订阅报纸,当我们订阅报纸时有新的报纸更新了就需要一个一个通知订阅了报纸的我们,当我们取消了订阅就无法获取报纸新的更新,再比如我们关注了中间件源码微信公众号,当有新的内容发布时候我们就可以收到更新的内容,当我们取消订阅的时候有新的优质内容发布的时候我们就无法感知,所以建议大家关注中间件源码微信公众号不要取消关注,

那观察者设计模式有什么优缺点呢?可以看如下表格:

那如何实现一个简单的观察者设计模式呢

我们可以建立如下模型:

  • 主题(被观察者)用来订阅
  • 抽象观察者,用来与主题耦合
  • 具体观察者通过继承抽象观察者,用来订阅主题

使用UML图可以如下标示:

[Elastic-Job2.1.5源码]-10- 调度作业启动时的生命周期-LMLPHP

图 6.4 观察者设计模式UML

在UML示例中一个主题(被观察者)包含了多个抽象观察者,主题可以动态添加(注册)观察者对象,也可以移除,当有事件产生的时候可以通知所有观察者触发监听notify方法。

接下来我们就来看下主题(被观察者)的代码实现:

import java.util.ArrayList;
import java.util.List;

public class Subject {

    /**
     * 订阅了主题的观察者集合
     */
    private List<Observer> observerList = new ArrayList<>();

    /**
     * 添加观察者
     * @param observer
     */
    public void addObserver(Observer observer) {
        observerList.add(observer);
    }


    /**
     * 移除观察者
     * @param observer
     */
    public void removeObserver(Observer observer) {
        observerList.remove(observer);
    }

    /**
     * 通知触发函数
     * @param msg
     */
    public void notify(String msg) {
        observerList.forEach(observer -> {
            observer.notify(msg);
        });
    }
}

抽象的观察者:

public abstract class Observer {

   public abstract void notify(String msg);

}

具体的观察者:

public class MyObserver extends Observer {

    @Override
    public void notify(String msg) {
        System.out.println(msg);
    }
}

运行测试

public class DemoMain {
    public static void main(String[] args) {
        Subject subject = new Subject();
        subject.addObserver(new MyObserver());
        subject.addObserver(new MyObserver());
        subject.notify("hell world");
    }
}

输出结果:

[Elastic-Job2.1.5源码]-10- 调度作业启动时的生命周期-LMLPHP

在Demo中我们执行了如下的流程:

  • 创建了一个订阅主题
  • 创建观察者1对象同时添加到主题对象中(观察者订阅主题
  • 创建观察者2对象同时添加到主题对象中(观察者订阅主题)
  • 执行主题的通知方法
  • 最后可以看到两个控制台可以打印两条数据,这两条数据分别是两个观察者打印的

通过对调度监听器的了解和对观察者设计模式的学习,后面如果有订阅主题的业务场景可以尝试使用观察者模式尝试一下,观察者设计模式在页面控件触发机制,网络请求回调机制,消息订阅等功能里面我们都会再遇到 。

查看更多原文,技术咨询支持,可以扫描微信公众号进行回复咨询
[Elastic-Job2.1.5源码]-10- 调度作业启动时的生命周期-LMLPHP

06-20 15:17