先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的
,这里以OneInputStreamTask为例进行说明。
一. StreamTask核心组件与能力
如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。
OneInputStreamTask
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
// 创建CheckpointedInputGate
CheckpointedInputGate inputGate = createCheckpointedInputGate();
TaskIOMetricGroup taskIOMetricGroup = getEnvironment()
.getMetricGroup().getIOMetricGroup();
taskIOMetricGroup.gauge("checkpointAlignmentTime",
inputGate::getAlignmentDurationNanos);
// 创建DataOutput组件
DataOutput<IN> output = createDataOutput();
StreamTaskInput<IN> input = createTaskInput(inputGate, output);
// 创建StreamOneInputProcessor
inputProcessor = new StreamOneInputProcessor<>(
input,
output,
getCheckpointLock(),
operatorChain);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK,
this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK,
this.inputWatermarkGauge::getValue);
小结:
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。
二. OneInputStreamTask接入网络数据并处理
StreamTask.processInput()方法定义了处理数据的主要流程。
StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller)
throws Exception {
InputStatus status = inputProcessor.processInput();
// 上游如果还有数据,则继续等待执行
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
// 上游如果没有数据,则发送控制消息到控制器
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction =
controller.suspendDefaultAction();
jointFuture.thenRun(suspendedDefaultAction::resume);
}
接下来详细看StreamOneInputProcessor.processInput()
public InputStatus processInput() throws Exception {
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endHeadOperatorInput(1);
}
}
return status;
}
StreamTaskNetworkInput.emitNext():处理数据逻辑。
//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。
public InputStatus emitNext(DataOutput<T> output) throws Exception {
while (true) {
// 从Deserializer中获取数据元素
if (currentRecordDeserializer != null) {
DeserializationResult result =
currentRecordDeserializer.getNextRecord(deserializationDelegate);
// 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
// 如果result是完整的数据元素,则调用processElement()方法进行处理
if (result.isFullRecord()) {
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// 从checkpointedInputGate中拉取数据
//如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
// 如果有数据则调用processBufferOrEvent()方法进行处理
if (bufferOrEvent.isPresent()) {
processBufferOrEvent(bufferOrEvent.get());
} else {
// 如果checkpointedInputGate已关闭,则返回END_OF_INPUT
if (checkpointedInputGate.isFinished()) {
checkState(checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
if (!checkpointedInputGate.isEmpty()) {
throw new IllegalStateException(
"Trailing data in checkpoint barrier handler.");
}
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
三. 处理数据
1. StreamElement类别
StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。
//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output)
throws Exception {
// StreamRecord类型
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
// Watermark类型
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
// LatencyMarker类型
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
// StreamStatus类型
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(),
lastChannel);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
2. 业务数据处理逻辑
对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链
中进行处理。
如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator
。
OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {
synchronized (lock) {
//累加器计算消费数量
numRecordsIn.inc();
//通过算子链处理
operator.setKeyContextElement1(record);
operator.processElement(record);
}
}
四. 小结
Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。
《Flink设计与实现:核心原理与源码解析》 – 张利兵