一、前言

    Elastic-Job是一个优秀的分布式作业调度框架。

    Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

    Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

    Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。

1. Elastic-Job-Lite

  • 分布式调度协调

  • 弹性扩容缩容

  • 失效转移

  • 错过执行作业重触发

  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

  • 自诊断并修复分布式不稳定造成的问题

  • 支持并行调度

  • 支持作业生命周期操作

  • 丰富的作业类型

  • Spring整合以及命名空间提供

  • 运维平台

2. Elastic-Job-Cloud

  • 应用自动分发

  • 基于Fenzo的弹性资源分配

  • 分布式调度协调

  • 弹性扩容缩容

  • 失效转移

  • 错过执行作业重触发

  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例

  • 支持并行调度

  • 支持作业生命周期操作

  • 丰富的作业类型

  • Spring整合

  • 运维平台

  • 基于Docker的进程隔离(TBD)

二、导读

    1、Elastic-Job的核心思想

    2、Elastic-Job的基本使用

三、Elastic-Job的核心思想

    对于分布式计算而言,分片是最基本的思想,Elastic-Job也是沿用了这个思想,每个job跑部分数据,所有job执行完成,便是全量数据,官网给出的SimpleJob例子如下:

public class MyElasticJob implements SimpleJob {

    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0:
                // do something by sharding item 0
                break;
            case 1:
                // do something by sharding item 1
                break;
            case 2:
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

    用switch case循环来对应分片的业务逻辑,case分片的index,进入业务逻辑执行。当然这里也有不适应的场景,类似于MapReduce需要shuffle的场景就不适合了,比方说,要根据某一个字段全局分组聚合求结果,这时候怎么分片都可能会不合理,因为每个分片只能处理N分之一的数据,没办法shuffle再聚合,这一点,也要根据具体的业务来使用。

   那么ShardingContext可以拿到那些信息呢?源码如下

    

public final class ShardingContext {

    /**
     * 作业名称.
     */
    private final String jobName;

    /**
     * 作业任务ID.
     */
    private final String taskId;

    /**
     * 分片总数.
     */
    private final int shardingTotalCount;

    /**
     * 作业自定义参数.
     * 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.
     */
    private final String jobParameter;

    /**
     * 分配于本作业实例的分片项.
     */
    private final int shardingItem;

    /**
     * 分配于本作业实例的分片参数.
     */
    private final String shardingParameter;

    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}

    以上代码,jobParameter和shardingItem是最有用的参数,shardingItem决定switch case循环的走向,shardingParameter可以用业务的查询条件,也可以用字符串拼接的方式组装很复杂的参数用于特定的业务。

四、Elastic-Job的基本使用

    1、Job配置项

public class ElasticJobConfig {
	private static CoordinatorRegistryCenter createRegistryCenter() {

		ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");
		CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
		regCenter.init();
		return regCenter;
	}

	private static LiteJobConfiguration createJobConfiguration() {

		JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3)
				.shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();
		SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
				MyElasticJob.class.getCanonicalName());
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)
				.build();
		return simpleJobRootConfig;
	}

	public static void main(String[] args) {
		new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
	}
}

    几点说明:

    注册中心配置项,设置zookeeper集群地址,我这里用的本地单节点,所以只有一个,当然可以配置任务名称,命名空间(namespace,本质上会在zk里生成一个目录),超时时间,最大重试次数等等

    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite参数非常重要,设置这个参数为true,修改过job配置信息才会覆盖zookeeper里的数据,要不然不会生效。

    2、SimpleJob的实现

public class MyElasticJob implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {
		switch (shardingContext.getShardingItem()) {
		case 0: {
			System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 1: {
			System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 2: {
			System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		default: {
			System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		}
	}
}

    上面设置每5秒钟执行一次,执行ElasticJobConfig的main方法,执行结果如下:

    分布式定时任务框架Elastic-Job的使用-LMLPHP

    从上面的结果,可以看出,执行每个分片的任务,其实是放到一个线程池去执行的,对应的分片信息和参数信息在shardingContext可以拿到,实现业务非常方便。

    最后,如果启动多个JVM,那么这些任务就分散到各个节点里,如果一个节点宕机,下次触发任务时,将把该分片任务丢到健康机器执行,这里做到了节点容错。但是某个分片的任务在执行过程中失败了,那么这里是不会重新触发改分片任务的执行的。

    

03-07 00:31