一、前言
Elastic-Job是一个优秀的分布式作业调度框架。
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。
Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务。
1. Elastic-Job-Lite
-
分布式调度协调
-
弹性扩容缩容
-
失效转移
-
错过执行作业重触发
-
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
-
自诊断并修复分布式不稳定造成的问题
-
支持并行调度
-
支持作业生命周期操作
-
丰富的作业类型
-
Spring整合以及命名空间提供
-
运维平台
2. Elastic-Job-Cloud
-
应用自动分发
-
基于Fenzo的弹性资源分配
-
分布式调度协调
-
弹性扩容缩容
-
失效转移
-
错过执行作业重触发
-
作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
-
支持并行调度
-
支持作业生命周期操作
-
丰富的作业类型
-
Spring整合
-
运维平台
-
基于Docker的进程隔离(TBD)
二、导读
1、Elastic-Job的核心思想
2、Elastic-Job的基本使用
三、Elastic-Job的核心思想
对于分布式计算而言,分片是最基本的思想,Elastic-Job也是沿用了这个思想,每个job跑部分数据,所有job执行完成,便是全量数据,官网给出的SimpleJob例子如下:
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
用switch case循环来对应分片的业务逻辑,case分片的index,进入业务逻辑执行。当然这里也有不适应的场景,类似于MapReduce需要shuffle的场景就不适合了,比方说,要根据某一个字段全局分组聚合求结果,这时候怎么分片都可能会不合理,因为每个分片只能处理N分之一的数据,没办法shuffle再聚合,这一点,也要根据具体的业务来使用。
那么ShardingContext可以拿到那些信息呢?源码如下
public final class ShardingContext {
/**
* 作业名称.
*/
private final String jobName;
/**
* 作业任务ID.
*/
private final String taskId;
/**
* 分片总数.
*/
private final int shardingTotalCount;
/**
* 作业自定义参数.
* 可以配置多个相同的作业, 但是用不同的参数作为不同的调度实例.
*/
private final String jobParameter;
/**
* 分配于本作业实例的分片项.
*/
private final int shardingItem;
/**
* 分配于本作业实例的分片参数.
*/
private final String shardingParameter;
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
以上代码,jobParameter和shardingItem是最有用的参数,shardingItem决定switch case循环的走向,shardingParameter可以用业务的查询条件,也可以用字符串拼接的方式组装很复杂的参数用于特定的业务。
四、Elastic-Job的基本使用
1、Job配置项
public class ElasticJobConfig {
private static CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3)
.shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
MyElasticJob.class.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)
.build();
return simpleJobRootConfig;
}
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
}
几点说明:
注册中心配置项,设置zookeeper集群地址,我这里用的本地单节点,所以只有一个,当然可以配置任务名称,命名空间(namespace,本质上会在zk里生成一个目录),超时时间,最大重试次数等等
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite参数非常重要,设置这个参数为true,修改过job配置信息才会覆盖zookeeper里的数据,要不然不会生效。
2、SimpleJob的实现
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
switch (shardingContext.getShardingItem()) {
case 0: {
System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
case 1: {
System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
case 2: {
System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
default: {
System.out.println("当前分片:" + shardingContext.getShardingItem() + "=====" + "参数:"
+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
break;
}
}
}
}
上面设置每5秒钟执行一次,执行ElasticJobConfig的main方法,执行结果如下:
从上面的结果,可以看出,执行每个分片的任务,其实是放到一个线程池去执行的,对应的分片信息和参数信息在shardingContext可以拿到,实现业务非常方便。
最后,如果启动多个JVM,那么这些任务就分散到各个节点里,如果一个节点宕机,下次触发任务时,将把该分片任务丢到健康机器执行,这里做到了节点容错。但是某个分片的任务在执行过程中失败了,那么这里是不会重新触发改分片任务的执行的。