文章目录
一. Taskmanager之间传递数据细节
Flink作业最终会被转换为ExecutionGraph
并拆解成Task,在TaskManager中调度并执行,Task实例之间会发生跨TaskManager节点的数据交换,尤其是在DataStream API中使用了物理分区操作
的情况。
ResultPartition组件存放中间结果等待下游节点消费:
InputChannel读取上游数据
ResultPartition(存储中间结果集)和InputGate(读取中间结果集)组件的创建
ShuffleMaster管理ResultPartition和InputGate。
因此在介绍ResultPartition和InputGate之前,我们先了解一下ShuffleMaster和ShuffleEnvironment的主要作用和创建过程。
二. ShuffleService的设计与实现
如图,创建ShuffleMaster和ShuffleEnvironment组件主要依赖ShuffleServiceFactory
实现。同时为了实现可插拔
的ShuffleService服务,ShuffleServiceFactory的实现类通过Java SPI的方式加载到ClassLoader中,即通过ShuffleServiceLoader从配置文件中加载系统配置的ShuffleServiceFactory实现类,因此用户也可以自定义实现Shuffle服务。
基于SPI的方式加载ShuffleServiceFactory
ShuffleEnvironment组件提供了创建Task实例中ResultPartition和InputGate组件的方法,同时Flink中默认提供了NettyShuffleEnvironment实现。
ShuffleMaster组件实现了对ResultPartition和InputGate的注册功能
ShuffleService UML关系图
三. 在JobMaster中创建ShuffleMaster
创建ShuffleMaster,ShuffleEnvironment的大致过程
分配slot资源,并将分区信息注册到ShuffleMaster中
CompletableFuture<Execution> allocateResourcesForExecution(
SlotProviderStrategy slotProviderStrategy,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
return allocateAndAssignSlotForExecution(
slotProviderStrategy,
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds)
.thenCompose(slot -> registerProducedPartitions(slot.getTaskManagerLocation()));
}
Execution.registerProducedPartitions()方法逻辑如下。
static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDep
loymentDescriptor>> registerProducedPartitions(
ExecutionVertex vertex,
TaskManagerLocation location,
ExecutionAttemptID attemptId,
boolean sendScheduleOrUpdateConsumersMessage) {
// 创建ProducerDescriptor
ProducerDescriptor producerDescriptor =
ProducerDescriptor.create(location, attemptId);
// 获取当前节点的partition信息
Collection<IntermediateResultPartition> partitions =
vertex.getProducedPartitions().values();
Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>>
partitionRegistrations =
new ArrayList<>(partitions.size());
// 向ShuffleMaster注册partition信息
for (IntermediateResultPartition partition : partitions) {
PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
int maxParallelism = getPartitionMaxParallelism(partition);
// 调用ShuffleMaster注册partitionDescriptor和producerDescriptor
CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = vertex
.getExecutionGraph()
.getShuffleMaster()
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
Preconditions.checkState(shuffleDescriptorFuture.isDone(),
"ShuffleDescriptor future is incomplete.");
// 创建ResultPartitionDeploymentDescriptor实例
CompletableFuture<ResultPartitionDeploymentDescriptor>
partitionRegistration =
shuffleDescriptorFuture
.thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(
partitionDescriptor,
shuffleDescriptor,
maxParallelism,
sendScheduleOrUpdateConsumersMessage));
// 添加到partitionRegistrations集合中
partitionRegistrations.add(partitionRegistration);
}
// 转换存储结构
return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>
producedPartitions =
new LinkedHashMap<>(partitions.size());
rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
return producedPartitions;
});
}
四. 在TaskManager中创建ShuffleEnvironment
从fromConfiguration创建并启动shuffleEnvironment
public static TaskManagerServices fromConfiguration(...)
throws Exception {
。。。
// 调用createShuffleEnvironment创建ShuffleEnvironment
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup);
// 启动shuffleEnvironment
final int dataPort = shuffleEnvironment.start();
...
}
NettyShuffleEnvironment的创建过程,以及它提供的能力:
这里了解NettyShuffleEnvironment的创建过程:
- 从NettyShuffleEnvironmentConfiguration参数中获取Netty相关配置,例如TransportType、InetAddress、serverPort以及numberOfSlots等信息。
- 创建ResultPartitionManager实例,
注册和管理TaskManager中的ResultPartition信息
,并提供创建ResultSubpartitionView的方法,专门用于消费ResultSubpartition中的Buffer数据
。 - 创建FileChannelManager实例,指定配置中的临时文件夹,然后创建并获取文件的FileChannel。
对于离线类型的作业,会将数据写入文件系统,再对文件进行处理,这里的实现和MapReduce算法类似
(ing)。 - 创建ConnectionManager实例,主要用于InputChannel组件。
InputChannel会通过ConnectionManager创建PartitionRequestClient,实现和ResultPartition之间的网络连接
。ConnectionManager会根据NettyConfig是否为空,选择创建NettyConnectionManager还是LocalConnectionManager。 - 创建NetworkBufferPool组件,用于向ResultPartition和InputGate组件
提供Buffer内存存储空间,实际上就是分配和管理MemorySegment内存块
。 - 向系统中注册ShuffleMetrics,用于跟踪Shuffle过程的
监控信息
。 - 创建ResultPartitionFactory工厂类,用于创建ResultPartition。
- 创建SingleInputGateFactory工厂类,用于创建SingleInputGate。
将以上创建的组件或服务作为参数来创建NettyShuffleEnvironment。
NettyShuffleServiceFactory.createNettyShuffleEnvironment()
static NettyShuffleEnvironment createNettyShuffleEnvironment(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup) {
// 检查参数都不能为空
。。。
// 获取Netty相关的配置参数
NettyConfig nettyConfig = config.nettyConfig();
// 创建ResultPartitionManager实例
ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
// 创建FileChannelManager实例
FileChannelManager fileChannelManager =
new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
// 创建ConnectionManager实例
ConnectionManager connectionManager =
nettyConfig != null ?
new NettyConnectionManager(resultPartitionManager,
taskEventPublisher, nettyConfig)
: new LocalConnectionManager();
// 创建NetworkBufferPool实例
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
config.networkBufferSize(),
config.networkBuffersPerChannel(),
config.getRequestSegmentsTimeout());
// 注册ShuffleMetrics信息
registerShuffleMetrics(metricGroup, networkBufferPool);
// 创建ResultPartitionFactory实例
ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
resultPartitionManager,
fileChannelManager,
networkBufferPool,
config.getBlockingSubpartitionType(),
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
config.networkBufferSize(),
config.isForcePartitionReleaseOnConsumption(),
config.isBlockingShuffleCompressionEnabled(),
config.getCompressionCodec());
// 创建SingleInputGateFactory实例
SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
taskExecutorResourceId,
config,
connectionManager,
resultPartitionManager,
taskEventPublisher,
networkBufferPool);
// 最后返回NettyShuffleEnvironment
return new NettyShuffleEnvironment(
taskExecutorResourceId,
config,
networkBufferPool,
connectionManager,
resultPartitionManager,
fileChannelManager,
resultPartitionFactory,
singleInputGateFactory);
}
至此,创建NettyShuffleEnvironment的过程就基本完成了,接下来TaskManager会接受JobMaster提交的Task申请
(这是一个被动过程?为了开口子接收其他task的数据?),然后通过ShuffleEnvironment为Task实例创建ResultPartition和InputGate组件。创建这些组件的信息来自ShuffleMaster中注册的ResultPartition和ExecutionEdge等信息。
接下来我们具体了解如何通过ShuffleEnvironment创建ResultPartition和InputGate两个重要组件。
五. 基于ShuffleEnvironment创建ResultPartition
1. 在task启动时创建ResultPartition
task启动时就创建ResultPartition
反压控制:动态控制数据向下游输出
org.apache.flink.runtime.taskmanager.Task
public Task(...){
final ShuffleIOOwnerContext taskShuffleContext = shuffleEnvironment
.createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId,
metrics.getIOMetricGroup());
// 创建ResultPartitonWriter
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment.createResultPartitionWriters(
taskShuffleContext,
resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {});
// 对ResultPartiton进行装饰
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(
resultPartitionDeploymentDescriptors,
resultPartitionWriters,
this,
jobId,
resultPartitionConsumableNotifier);
}
2. ResultPartition的创建与对数据的行为
如代码,接着看创建ResultPartition的主要逻辑。
- 根据resultPartitionDeploymentDescriptors的大小初始化ResultPartition数组。
- 通过resultPartitionFactory创建ResultPartition。
- 调用registerOutputMetrics()方法注册resultPartitions相关的监控指标信息。
- 返回创建的ResultPartition数组。
NettyShuffleEnvironment.createResultPartitionWriters()
public Collection<ResultPartition> createResultPartitionWriters(
ShuffleIOOwnerContext ownerContext,
Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeployment
Descriptors) {
synchronized (lock) {
Preconditions
.checkState(!isClosed,
"The NettyShuffleEnvironment has already been shut down.");
// 根据resultPartitionDeploymentDescriptors创建ResultPartition数组
ResultPartition[] resultPartitions =
new ResultPartition[resultPartitionDeploymentDescriptors.size()];
int counter = 0;
// 遍历ResultPartitionDeploymentDescriptor创建ResultPartition
for (ResultPartitionDeploymentDescriptor rpdd :
resultPartitionDeploymentDescriptors) {
resultPartitions[counter++] =
resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd);
}
registerOutputMetrics(config.isNetworkDetailedMetrics(),
ownerContext.getOutputGroup(), resultPartitions);
return Arrays.asList(resultPartitions);
}
}
继续了解ResultPartition的创建过程
- 判断ResultPartitionType是否为Blocking类型,如果是则需要创建BufferCompressor,用于压缩Buffer数据,即在离线数据处理过程中通过BufferCompressor压缩Buffer数据。
- 根据numberOfSubpartitions对应的数量创建ResultSubpartition数组,并存储当前ResultPartition中的ResultSubpartition。
- 根据ResultPartitionType参数创建ResultPartition,如果ResultPartitionType是Blocking类型,则创建ReleaseOnConsumptionResultPartition,即
数据消费完便立即释放ResultPartition
。否则创建ResultSubpartition,即不会随着数据消费完之后进行释放,适用于流数据处理场景
。 - 调用createSubpartitions()方法创建ResultSubpartition。ResultSubpartition会有ID进行区分,并
和InputGate中的InputChannel一一对应
。
//ResultPartitionFactory.create()
public ResultPartition create(
String taskNameWithSubtaskAndId,
ResultPartitionID id,
ResultPartitionType type,
int numberOfSubpartitions,
int maxParallelism,
FunctionWithException<BufferPoolOwner, BufferPool, IOException>
bufferPoolFactory)
{
BufferCompressor bufferCompressor = null;
// 如果ResultPartitionType是Blocking类型,则需要创建BufferCompressor,用于数据压缩
if (type.isBlocking() && blockingShuffleCompressionEnabled) {
bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
}
// 创建ResultSubpartition数组
ResultSubpartition[] subpartitions = new ResultSubpartition
[numberOfSubpartitions];
// 根据条件创建ResultPartition
ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
? new ReleaseOnConsumptionResultPartition(
taskNameWithSubtaskAndId,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferCompressor,
bufferPoolFactory)
: new ResultPartition(
taskNameWithSubtaskAndId,
id,
type,
subpartitions,
maxParallelism,
partitionManager,
bufferCompressor,
bufferPoolFactory);
// 创建Subpartitions
createSubpartitions(partition, type, blockingSubpartitionType, subpartitions);
LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
return partition;
}
3. 创建ResultSubpartitions与 应用与流或批场景
private void createSubpartitions(
ResultPartition partition,
ResultPartitionType type,
BoundedBlockingSubpartitionType blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
// 创建ResultSubpartitions.
if (type.isBlocking()) {
initializeBoundedBlockingPartitions(
subpartitions,
partition,
blockingSubpartitionType,
networkBufferSize,
channelManager);
} else {
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new PipelinedSubpartition(i, partition);
}
}
}
六. 基于ShuffleEnvironment创建InputGate
1. 在哪里创建的InputGate
和ResultPartition创建过程相似,Task的初始化过程中也会创建InputGate。如代码,Task构造器方法中涵盖了InputGate的创建逻辑。
final InputGate[] gates = shuffleEnvironment.createInputGates(
taskShuffleContext,
this,
inputGateDeploymentDescriptors).toArray(new InputGate[] {});
this.inputGates = new InputGate[gates.length];
int counter = 0;
for (InputGate gate : gates) {
inputGates[counter++] = new InputGateWithMetrics(gate, metrics.
getIOMetricGroup().getNumBytesInCounter());
}
接下来具体看NettyShuffleEnvironment.createInputGates()的逻辑
- 获取networkInputGroup信息,用于创建InputChannelMetrics。
- 根据
inputGateDeploymentDescriptors
(Shufflemanager传递的,那这个数量是怎么确定的?ing
)数组的大小创建SingleInputGate数组,用于存储SingleInputGate组件。 - 根据InputGateDeploymentDescriptor创建SingleInputGate
- 注册InputGate的监控信息,并返回SingleInputGate集合。
public Collection<SingleInputGate> createInputGates(
ShuffleIOOwnerContext ownerContext,
PartitionProducerStateProvider partitionProducerStateProvider,
Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
synchronized (lock) {
Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has
already been shut down.");
MetricGroup networkInputGroup = ownerContext.getInputGroup();
@SuppressWarnings("deprecation")
InputChannelMetrics inputChannelMetrics =
new InputChannelMetrics(networkInputGroup, ownerContext.
getParentGroup());
SingleInputGate[] inputGates =
new SingleInputGate[inputGateDeploymentDescriptors.size()];
int counter = 0;
//遍历igdd通过singleInputGateFactory创建inputGate
for (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) {
SingleInputGate inputGate = singleInputGateFactory.create(
ownerContext.getOwnerName(),
igdd,
partitionProducerStateProvider,
inputChannelMetrics);
InputGateID id = new InputGateID(igdd.getConsumedResultId(),
ownerContext.
getExecutionAttemptID());
inputGatesById.put(id, inputGate);
inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id));
inputGates[counter++] = inputGate;
}
//注册metric
registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup,
inputGates);
return Arrays.asList(inputGates);
}
}
2. SingleInputGate的创建和提供的能力
2.1. 创建SingleInputGate
继续看SingleInputGateFactory创建SingleInputGate的过程,如代码
- 创建createBufferPoolFactory,用于创建LocalBufferPool。
通过LocalBufferPool可以为InputGate提供Buffer数据的存储空间,实现本地缓冲接入InputGate中的二进制数据。
- 根据结果分区类型和是否支持压缩决定是否创建
BufferDecompressor
,这里和ResultPartition中的BufferCompressor
是对应的,即通过BufferDecompressor解压经过BufferCompressor压缩后的Buffer数据。 - 通过InputGateDeploymentDescriptor中的参数BufferCompressor和BufferPoolFactory创建SingleInputGate对象。
- 调用createInputChannels()方法创建SingleInputGate中的InputChannels。
- 将创建完成的inputGate返回给Task实例。
public SingleInputGate create(
@Nonnull String owningTaskName,
@Nonnull InputGateDeploymentDescriptor igdd,
@Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
@Nonnull InputChannelMetrics metrics) {
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(
networkBufferPool,
networkBuffersPerChannel,
floatingNetworkBuffersPerGate,
igdd.getShuffleDescriptors().length,
igdd.getConsumedPartitionType());
BufferDecompressor bufferDecompressor = null;
if (igdd.getConsumedPartitionType().isBlocking()
&& blockingShuffleCompressionEnabled) {
bufferDecompressor = new BufferDecompressor(networkBufferSize,
compressionCodec);
}
SingleInputGate inputGate = new SingleInputGate(
owningTaskName,
igdd.getConsumedResultId(),
igdd.getConsumedPartitionType(),
igdd.getConsumedSubpartitionIndex(),
igdd.getShuffleDescriptors().length,
partitionProducerStateProvider,
bufferPoolFactory,
bufferDecompressor);
//创建SingleInputGate中的InputChannels。
createInputChannels(owningTaskName, igdd, inputGate, metrics);
return inputGate;
}
SingleInputGateFactory.createInputChannels()方法定义了创建指定SingleInputGate对应的InputChannel集合。
- 获取ShuffleDescriptor列表,ShuffleDescriptor是在ShuffleMaster中创建和生成的,描述了数据生产者和ResultPartition等信息。
- 创建InputChannel数组,最后将其存储到inputGate中。
可以看出每个resultPartitionID对应一个InputChannel。
private void createInputChannels(
String owningTaskName,
InputGateDeploymentDescriptor inputGateDeploymentDescriptor,
SingleInputGate inputGate,
InputChannelMetrics metrics) {
ShuffleDescriptor[] shuffleDescriptors =
inputGateDeploymentDescriptor.getShuffleDescriptors();
// 创建InputChannel
InputChannel[] inputChannels = new InputChannel[shuffleDescriptors.length];
ChannelStatistics channelStatistics = new ChannelStatistics();
for (int i = 0; i < inputChannels.length; i++) {
inputChannels[i] = createInputChannel(
inputGate,
i,
shuffleDescriptors[i],
channelStatistics,
metrics);
ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
}
LOG.debug("{}: Created {} input channels ({}).",
owningTaskName,
inputChannels.length,
channelStatistics);
}
2.2. InputChannel的创建与处理同一个tm的数据或跨tm的数据的能力
概述
重点了解LocalInputChannel和RemoteInputChannel的创建过程。
创建内置InputChannel的主要逻辑:
private InputChannel createKnownInputChannel(
SingleInputGate inputGate,
int index,
NettyShuffleDescriptor inputChannelDescriptor,
ChannelStatistics channelStatistics,
InputChannelMetrics metrics) {
ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
// Task实例属于同一个TaskManager
channelStatistics.numLocalChannels++;
return new LocalInputChannel(
inputGate,
index,
partitionId,
partitionManager,
taskEventPublisher,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
metrics);
} else {
// Task实例属于不同的TaskManager
channelStatistics.numRemoteChannels++;
return new RemoteInputChannel(
inputGate,
index,
partitionId,
inputChannelDescriptor.getConnectionId(),
connectionManager,
partitionRequestInitialBackoff,
partitionRequestMaxBackoff,
metrics,
networkBufferPool);
}
}
到这里,ResultPartition和InputGate组件就全部创建完毕了。Task实例会将ResultPartition和InputGate组件封装在环境信息中,然后传递给StreamTask。StreamTask获取ResultPartition和InputGate,用于创建StreamNetWorkTaskInput和RecordWriter组件,从而完成Task中数据的输入和输出。