1.OperatorChain的设计与实现
OperatorChain的大致逻辑
OperatorChain的Output组件:将数据发送到下游
OperatorChain的collect():收集处理完的数据
OperatorChain的Output接口:也能输出Watermark和LatencyMarker等事件
OperatorChain内部定义了不同的WatermarkGaugeExposingOutput接口实现类。
例子:收集数据并通过Output发数据数据到下游
2.OperatorChain的创建和初始化
接下来我们看OperatorChain的初始化过程,如下代码,OperatorChain的构造器包含如下逻辑。
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
recordWriterDelegate
) {
// 获取当前StreamTask的userCodeClassloader
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
// 获取StreamConfig
final StreamConfig configuration = containingTask.getConfiguration();
// 获取StreamOperatorFactory
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// 读取chainedConfigs
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// 根据StreamEdge创建RecordWriterOutput组件
List<StreamEdge> outEdgesInOrder =
configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
boolean success = false;
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
// 为每个输出边创建RecordWriterOutput
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// 创建OperatorChain内部算子之间的连接
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output =
getChainEntryPoint();
// 创建headOperator
headOperator = StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
output.getWatermarkGauge());
} else {
headOperator = null;
}
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
success = true;
}
finally {
// 如果创建不成功,则关闭StreamOutputs中的RecordWriterOutput
// 这里防止内存泄漏
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
OperatorChain作用小结
3.创建RecordWriterOutput
RecordWriterOutput用于将数据输出到网络指定位置。
OperatorChain.createStreamOutput()逻辑如下:
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
// 获取OutputTag
OutputTag sideOutputTag = edge.getOutputTag();
// 获取数据序列化器TypeSerializer
TypeSerializer outSerializer = null;
// 如果StreamEdge指定了OutputTag
if (edge.getOutputTag() != null) {
// 则进行边路输出
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// 正常输出
outSerializer =
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
// 返回创建的RecordWriterOutput实例
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}
StreamRecord将数据输出的逻辑
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
return;
}
pushToRecordWriter(record);
}
pushToRecordWriter发送数据
//RecordWriterOutput.pushToRecordWriter()
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}