roman_日积跬步-终至千里

roman_日积跬步-终至千里

先来看数据是如何经过网络写入下游Task节点并通过算子进行处理的,这里以OneInputStreamTask为例进行说明。

一. StreamTask核心组件与能力

如代码OneInputStreamTask.init()方法包含了初始化StreamTask主要核心组件的逻辑。

OneInputStreamTask
public void init() throws Exception {
   StreamConfig configuration = getConfiguration();
   int numberOfInputs = configuration.getNumberOfInputs();
   if (numberOfInputs > 0) {
      // 创建CheckpointedInputGate
      CheckpointedInputGate inputGate = createCheckpointedInputGate();
      TaskIOMetricGroup taskIOMetricGroup = getEnvironment()
          .getMetricGroup().getIOMetricGroup();
      taskIOMetricGroup.gauge("checkpointAlignmentTime", 
                              inputGate::getAlignmentDurationNanos);
      // 创建DataOutput组件
      DataOutput<IN> output = createDataOutput();
      StreamTaskInput<IN> input = createTaskInput(inputGate, output);
      // 创建StreamOneInputProcessor
      inputProcessor = new StreamOneInputProcessor<>(
         input,
         output,
         getCheckpointLock(),
         operatorChain);
   }
   headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
                                       this.inputWatermarkGauge);
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
                                         this.inputWatermarkGauge::getValue);
                                     

小结:

 
 
接下来了解StreamTask如何利用StreamTaskInput和DataOutput完成数据元素的接收并发送到算子链中进行处理。

二. OneInputStreamTask接入网络数据并处理

StreamTask.processInput()方法定义了处理数据的主要流程。

StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) 
    throws Exception {
   InputStatus status = inputProcessor.processInput();
   // 上游如果还有数据,则继续等待执行
   if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
      return;
   }
   // 上游如果没有数据,则发送控制消息到控制器
   if (status == InputStatus.END_OF_INPUT) {
      controller.allActionsCompleted();
      return;
   }
   CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
   MailboxDefaultAction.Suspension suspendedDefaultAction = 
      controller.suspendDefaultAction();
   jointFuture.thenRun(suspendedDefaultAction::resume);
}

 
接下来详细看StreamOneInputProcessor.processInput()

public InputStatus processInput() throws Exception {
   InputStatus status = input.emitNext(output);
   if (status == InputStatus.END_OF_INPUT) {
      synchronized (lock) {
         operatorChain.endHeadOperatorInput(1);
      }
   }
   return status;
}

StreamTaskNetworkInput.emitNext():处理数据逻辑。


//BufferOrEvent代表数据元素可以是Buffer类型,也可以是事件类型,
//比如CheckpointBarrier、TaskEvent等事件。

public InputStatus emitNext(DataOutput<T> output) throws Exception {
   while (true) {
      // 从Deserializer中获取数据元素
      if (currentRecordDeserializer != null) {
         DeserializationResult result = 
             currentRecordDeserializer.getNextRecord(deserializationDelegate);
         // 如果DeserializationResult对应的Buffer数据已经被消费,则回收Buffer 
         if (result.isBufferConsumed()) {
            currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
            currentRecordDeserializer = null;
         }
         // 如果result是完整的数据元素,则调用processElement()方法进行处理
         if (result.isFullRecord()) {
            processElement(deserializationDelegate.getInstance(), output);
            return InputStatus.MORE_AVAILABLE;
         }
      }
      // 从checkpointedInputGate中拉取数据
      //如果bufferOrEvent为空,则判断checkpointedInputGate是否已经关闭,如果已经关闭了则直接返回END_OF_INPUT状态,否则返回NOTHING_AVAILABLE状态。
      Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
      // 如果有数据则调用processBufferOrEvent()方法进行处理
      if (bufferOrEvent.isPresent()) {
         processBufferOrEvent(bufferOrEvent.get());
      } else {
         // 如果checkpointedInputGate已关闭,则返回END_OF_INPUT
         if (checkpointedInputGate.isFinished()) {
            checkState(checkpointedInputGate.getAvailableFuture().isDone(), 
                       "Finished BarrierHandler should be available");
            if (!checkpointedInputGate.isEmpty()) {
               throw new IllegalStateException(
                   "Trailing data in checkpoint barrier handler.");
            }
            return InputStatus.END_OF_INPUT;
         }
         return InputStatus.NOTHING_AVAILABLE;
      }
   }
}

 

三. 处理数据

1. StreamElement类别

StreamElement具体类别有StreamRecord、StreamStatus以及Watermark,其中StreamRecord就是需要处理的业务数据,Watermark则是上游传递下来的Watermark事件。

//StreamTaskNetworkInput.processElement()
private void processElement(StreamElement recordOrMark, DataOutput<T> output) 
   throws Exception {
   // StreamRecord类型
   if (recordOrMark.isRecord()){
      output.emitRecord(recordOrMark.asRecord());
   // Watermark类型
   } else if (recordOrMark.isWatermark()) {
      statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
   // LatencyMarker类型
   } else if (recordOrMark.isLatencyMarker()) {
      output.emitLatencyMarker(recordOrMark.asLatencyMarker());
   // StreamStatus类型
   } else if (recordOrMark.isStreamStatus()) {
      statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), 
         lastChannel);
   } else {
      throw new UnsupportedOperationException("Unknown type of StreamElement");
   }
}

 

2. 业务数据处理逻辑

对于业务数据,调用output.emitRecord(recordOrMark.asRecord())方法进行数据元素的输出操作,然后通过DataOutput输出到算子链中进行处理。

如下方法调用operator处理,实际就是在创建StreamTaskNetworkOutput时指定的算子链HeaderOperator

OneInputStreamTask.StreamTaskNetworkOutput.emitRecord()
public void emitRecord(StreamRecord<IN> record) throws Exception {
   synchronized (lock) {
      //累加器计算消费数量
      numRecordsIn.inc();
      //通过算子链处理
      operator.setKeyContextElement1(record);
      operator.processElement(record);
   }
}

 

四. 小结

Flink从InputGate中拉取数据元素并进行反序列化操作,转换成StreamElement类型后,再调用StreamTaskNetworkOutput.emitRecord()方法将数据元素推送到OperatorChain的HeaderOperator中进行处理。

 
 
《Flink设计与实现:核心原理与源码解析》 – 张利兵

03-06 06:45