源码解析
基本调用类分析
任务启动由python脚本新建进程进行任务执行,后续执行由Java进行,以下将对java部分进行分
其中的调用原理机制。
Engine
首先入口类为com.alibaba.datax.core.Engine
的main
方法,其中通过调用其本身的静态方法entry
,该方法主要针对输入参入进行格式化以及校验:
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
其中需要注意运行模式是通过RUNTIME_MODE = cl.getOptionValue("mode")
代码直接赋值给静态变量的,针对命令行参数采用了org.apache.commons
的BasicParser
解析,针对任务的配置文件则通过其本身的ConfigParser
进行解析(可以支持本地和网络文件)。
完成配置初始化后该方法将实例化本身并调用其start
方法,将初始化好的配置对象传入其中。该方法首先将类型转换进行初始化,以保证在后续数据导入导出中不兼容类型可以进行顺利的转换工作,具体通过ColumnCast.bind(configuration)
方法进行绑定,其中主要针对三种类型进行初始化工作:
StringCast.init(configuration);
DateCast.init(configuration);
BytesCast.init(configuration);
接着就是利用LoadUtil.Bind(pluginCoonfigs)
保存插件,便于后续读取插件相关配置信息内容。剩下就是该函数的核心流程,即判断当前的任务运行模式,是TaskGroup
还是Job
模式。但通过实际分析来看基本都是Job
模式,所以后续我们主要以JobContainer
为切入点,另一个则为TaskGroupContainer
。两者均继承自AbstractContainer
基类,并通过调用他们的start
方法进行启动。
对于非Standlone
模式还支持记录任务进度情况,进行汇报的功能。具体由最后实例化的PerfTrace
类实现。
JobContainer
首先该类的构造函数中仅初始化了ErrorRecordChecker
类用于检查任务是否到达错误记录限制。而主要的运行则落在了start
方法中,细心读者可以发现其中读取了job.setting.dryRun
配置参数,判断是否需要执行预检查(preCheck)。正常工作流程则如下所示:
- preHandle
Job前置操作,即初始化preHandler插件并执行其preHandler
;
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(
handlerPluginType, handlerPluginName);
// todo...
handler.preHandler(configuration);
- init
初始化reader和writer,实际方法中根据读写插件各自执行了对应的初始化方法,具体代码如下所示。
this.jobReader = this.initJobReader(jobPluginCollector);
this.jobWriter = this.initJobWriter(jobPluginCollector);
其中各方法均类似,就是读取对应加载对应的插件对象并调用插件对应的方法进行相关配置的设置以及对应方法的初始化,选取initJobReader
方法中的部分代码片段如下:
Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
PluginType.READER, this.readerPluginName);
// todo...
jobReader.setJobPluginCollector(jobPluginCollector);
jobReader.init();
- prepare
全局准备工作,比如odpswriter清空目标表。由于读写插件的特殊性质,其方法内部主要也是执行了各类型插件的方法来实现准备工作。
this.prepareJobReader();
this.prepareJobWriter();
其中各自方法的差异性较小,主要就是实例化插件然后直接调用其对应的prepare
即可。
- split
拆分Task
,参数adviceNumber
为建议的拆分数。除此之外我们还可以通过字节和事务的限速来进行控制,从而决定Channel的数量。具体配置参数如下:
job.setting.speed.byte
:总BPS限速,如果存在值则单个Channel的BPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;core.transport.channel.speed.byte
:单个Channel的BPS限速;job.setting.speed.record
:总TPS限速,如果存在则单个Channel的TPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;core.transport.channel.speed.record
:单个Channel的TPS限速;
如果两个限速均存在则取值最少的那一个,如果两者都没有设置则通过job.setting.speed.channel
参数获取,最终决定needChannelNumber
参数。根据得出的参数进行Reader与Writer的拆分。
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
以上这两种方法大同小异,只是内部读取的插件不同,这里我们就以Reader为例进行说明。内部实例化好对应插件后,通过插件Job的split方法进行实际切分。
List<Configuration> readerSlicesConfigs =
this.jobReader.split(adviceNumber);
而实际的切分则需要由插件开发人员通过实现Job的split
方法来满足,该方法将返回Configuration
列表,最终将会把reader和writer以及配置项job.content[0].transformer
重新整合成contentconfig
并作为变量configuration
中Key为job.content
的值,从而便于将其传递至各Task中。
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
- schedule
完成任务的切换后将开始执行任务。由于实际任务是由TaskGroupContainer
执行,为此我们还需要划分对应TaskGroup
需要运行的Task,该参数通过core.container.taskGroup.channel
进行配置,默认为5。决定每个Group运行那些Task的则由以下方法进行决定,将直接返回对应任务组的配置参数。
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
关于该方法的具体剖析可以跳转到本篇幅
完成任务分配后我们就需要根据运行模式决定调度器,通过这里的源码可以明显看出其DataX 3.0
是经过了阉割,仅保留了单机运行模式。
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
故后续我们仅能描述单机模式下关于任务调度的工作原理。首先是调度器初始化的核心方法initStandaloneScheduler
,其方法主要是初始化了StandAloneJobContainerCommunicator
类用于通信(其中collect由ProcessInnerCollector提供,reporter由ProcessInnerReporter提供),StandAloneScheduler
则为实际调度器。具体的说明请跳转到本篇幅。最终将对应的配置信息传入调度器中进行执行就完成了。
scheduler.schedule(taskGroupConfigs);
ProcessInnerCollector
在AbstractScheduler
的schedule
中通过StandAloneJobContainerCommunicator
类调用了其collect
方法,而其方法的背后则是其他类对应的方法。
public Communication collect() {
return super.getCollector().collectFromTaskGroup();
}
该类为ProcessInnerCollector
类,其对应的方法依然是LocalTGCommunicationManager
静态类其中一个静态方法。
public Communication collectFromTaskGroup() {
return LocalTGCommunicationManager.getJobCommunication();
}
其内部也是将之前每个TaskGroup所创建的Communication
维护了一个静态字典并在需要的时候进行合并。
public static Communication getJobCommunication() {
Communication communication = new Communication();
communication.setState(State.SUCCEEDED);
for (Communication taskGroupCommunication :
taskGroupCommunicationMap.values()) {
communication.mergeFrom(taskGroupCommunication);
}
return communication;
}
ProcessInnerReporter
在AbstractScheduler
的schedule
中通过StandAloneJobContainerCommunicator
类调用了其report
方法,而其方法的背后则是其他类对应的方法。
public void report(Communication communication) {
super.getReporter().reportJobCommunication(super.getJobId(), communication);
LOG.info(CommunicationTool.Stringify.getSnapshot(communication));
reportVmInfo();
}
而Reporter对象则为ProcessInnerReporter
类,对应的方法则是该类的reportJobCommunication
方法,其本身也是调用了其他静态类的静态方法进行实现。
public void reportJobCommunication(Long jobId, Communication communication) {
// do nothing
}
可以看到当前源码并没有是实现输出Job的统计信息。
JobAssignUtil.assignFairly
该方法首先通过Channel数量除以每个TaskGroup可以处理的Channel数量从而得出TaskGroup数量。在实际切分中考虑到Shuffle的成本,插件开发者可以通过reader.parameter.loadBalanceResourceMark
与writer.parameter.loadBalanceResourceMark
来划定每个Task的标识,从而便于在分配任务时将对应标识的Reader与Writer分配到同一个TaskGroup中,如果不存在则会自动设置一个默认的标识。
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
boolean hasLoadBalanceResourceMark = StringUtils.isNotBlank(readerResourceMark) ||
StringUtils.isNotBlank(writerResourceMark);
if (!hasLoadBalanceResourceMark) {
for (Configuration conf : contentConfig) {
conf.set(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK, "aFakeResourceMarkForLoadBalance");
}
Collections.shuffle(contentConfig, new Random(System.currentTimeMillis()));
}
根据资源标识将开始将根据资源标识将对应的Task进行切换,其主要由parseAndGetResourceMarkAndTaskIdMap
方法进行分配,其内部就是根据资源标识维护一个字典,如果是默认标识则字典仅有一个对象,所有的Task都归属其中。
LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
根据标识完成分组后就需要将Task配置按照TaskGroup进行分配,以满足调用的需要,这里通过调用doAssign
方法来满足。其方法主要先获取到按照标识分组后其中最大组的成员数量mapValueMaxLength
,并与标识数采用进行2层循环将各个task配置存储到对应分组编号的数据中。
for (int i = 0; i < mapValueMaxLength; i++) {
for (String resourceMark : resourceMarks) {
if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
taskGroupIndex++;
resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}
以上进行以数组的形式进行分配,而实际需要使用Configuration
对象,为此我们还需要将以上信息重新组织存储到对应的配置对象中,具体结构可以参考如下源码:
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);
result.add(tempTaskGroupConfig);
}
上述方法虽然完成了最终的任务,但是实际每个TaskGroup所分配到的Task并不是平均的,这就导致对应的TaskGroup的Channel也是不均衡的,为了便于后期的优化,我们还需要将对应TaskGroup所需的Channel数量存入到core.container.taskGroup.channel
配置项中。
StandAloneScheduler
该类本身并没有太多实质性的内容,具体的功能内容更多的在其父类ProcessInnerScheduler
与AbstractScheduler
中,关于该两个类的说明将直接在本篇幅中进行概述,不新起篇章。
我们以schedule
的调用顺苏为例进行说明,首先获取用于汇报的时间间隔,分别为core.container.job.reportInterval
与core.container.job.sleepInterval
参数,前者为每次汇报的时间间隔,默认为30秒,后者为每次睡眠时间,即每次汇总采集的间隔时间。
由于任务的运行无法避免错误的出现,为了保障任务的成功运行,在每次汇报的同时还增加了额外的错误检查机制,通过脏数据出现的次数与比率进行判断,从而中止任务的继续。
errorLimit = new ErrorRecordChecker(configurations.get(0));
// to do...
errorLimit.checkRecordLimit(nowJobContainerCommunication);
其通过ErrorRecordChecker
类提供,该类通过recordLimit
检查条数与percentageLimit
百分比检查任务是否到达错误记录的限制,对应的限制通过读取配置中的job.setting.errorLimit.record
与job.setting.errorLimit.percentage
参数。对于任务的执行最核心的当然是startAllTaskGroup
方法了,该方法位于ProcessInnerScheduler
类中。
该方法直接利用Java本身的Executors.newFixedThreadPool
方法创建了分组数的线程池资源,然后通过将TaskGroupContainer
对象包装到TaskGroupContainerRunner
对象中来进行运行。
TaskGroupContainer taskGroupContainer = new TaskGroupContainer(configuration);
return new TaskGroupContainerRunner(taskGroupContainer);
其TaskGroupContainerRunner
内部的run
实际依然是调用了对应TaskGroupContainer
对象的start
方法。而关于该类的说明将会另启篇幅进行具体说明。