Flink中的数据交换基于以下设计原则
1.用于数据交换的控制流(即:为了启动交换而传递的消息)是接收者启动的,就像原始MapReduce一样
2.用于数据交换的数据流,即通过线路的实际数据传输由IntermediateResult的概念抽象,并且是可插入的。 这意味着系统可以使用相同的实现支持流数据传输和批量数据传输。
数据交换涉及许多实例,例如:
JobManager是主节点,负责调度任务,恢复和协调,并通过ExecutionGraph数据结构保存工作的全貌。
TaskManagers,工作节点。 TaskManager(TM)在线程中同时执行许多任务。 每个TM还包含一个CommunicationManager(CM - 在任务之间共享)和一个MemoryManager(MM - 也在任务之间共享)。 TM可以通过复用的TCP连接相互交换数据,这些连接是在需要时创建的。
请注意,在Flink中,通过网络交换数据的是TaskManagers,而不是任务,即,通过一个网络连接复用生活在同一TM中的任务之间的数据交换。
ExecutionGraph:执行图是一种数据结构,包含有关作业计算的“基本事实”。 它由表示计算任务的顶点(ExecutionVertex)和表示任务生成的数据的中间结果(IntermediateResultPartition)组成。 顶点链接到它们通过ExecutionEdges(EE)消耗的中间结果:
这些是JobManager中的逻辑数据结构。 它们具有运行时等效结构,负责TaskManagers中的实际数据处理。 IntermediateResultPartition的运行时等价物称为ResultPartition。
ResultPartition(RP)表示BufferWriter写入的一大块数据,即由单个任务生成的一大块数据。 RP是结果子分区(RS)的集合。 这是为了区分指向不同接收器的数据,例如,在用于reduce或join的分区shuffle的情况下。
ResultSubpartition(RS)表示由operator创建的数据的一个分区,以及将此数据转发给接收operator的逻辑。 RS的具体实现确定了实际的数据传输逻辑,这是可插拔的机制,允许系统支持各种数据传输。 例如,PipelinedSubpartition是一个支持流数据交换的流水线实现。 SpillableSubpartition是一种支持批量数据交换的阻塞实现。
InputGate:接收端RP的逻辑等效项。 它负责收集数据缓冲区并将其上传到上游。
InputChannel:接收端RS的逻辑等价物。 它负责收集特定分区的数据缓冲区。
Buffer: See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
序列化器和反序列化器可靠地将类型化记录转换为原始字节缓冲区,反之亦然,处理跨越多个缓冲区的记录等。
Control flow for data exchange
图片代表一个简单的map-reduce作业,具有两个并行任务。我们有两个TaskManagers,每个都有两个任务(一个map任务和一个reduce任务)在两个不同的节点中运行,一个JobManager在第三个节点中运行。我们专注于启动任务M1和R2之间的转移。使用粗箭头表示数据传输,使用细箭头表示消息。首先,M1生成ResultPartition(RP1)(箭头1)。当RP可供使用时(我们将在稍后讨论),它会通知JobManager(箭头2)。 JobManager通知该分区的预期接收者(任务R1和R2)分区已准备就绪。如果尚未安排接收器,这实际上将触发任务的部署(箭头3a,3b)。然后,接收器将从RP请求数据(箭头4a和4b)。这将在本地(情况5a)或通过TaskManagers(5b)的网络堆栈启动任务(箭头5a和5b)之间的数据传输。当RP决定通知JobManager其可用性时,该过程留下一定程度的自由度。例如,如果RP1在通知JM之前完全自行生成(并且可能写入文件),则数据交换大致对应于Hadoop中实现的批处理交换。如果RP1在产生第一条记录后立即通知JM,我们就会进行流数据交换。
Transfer of a byte buffer between two tasks
这张图片更详细地展示了数据记录从生产者发送到消费者的生命周期。最初,MapDriver生成传递给RecordWriter对象的记录(由收集器收集)。 RecordWriters包含许多序列化程序(RecordSerializer对象),每个消费者任务可能会使用这些记录。例如,在shuffle或broadcast中,将有与消费者任务数量一样多的序列化器。 ChannelSelector选择一个或多个序列化程序来放置记录。例如,如果广播记录,它们将被放置在每个序列化器中。如果记录是散列分区的,则ChannelSelector将评估记录上的哈希值并选择适当的序列化程序。
序列化程序将记录序列化为二进制表示形式,并将它们放在固定大小的缓冲区中(记录可以跨越多个缓冲区)。这些缓冲区移交给BufferWriter并写入ResultPartition(RP)。 RP由几个子分区(ResultSubpartitions-RSs)组成,为特定的消费者收集缓冲区。在图片中,缓冲区的目的地是第二个reducer(在TaskManager 2中),它被放置在RS2中。由于这是第一个缓冲区,RS2可供使用(请注意,此行为实现了流式shuffle),并通知JobManager事实。
JobManager查找RS2的使用者,并通知TaskManager 2有可用的数据块。到TM2的消息向下传播到应该接收此缓冲区的InputChannel,后者又通知RS2可以启动网络传输。然后,RS2将缓冲区移交给TM1的网络堆栈,然后TM1将其交给netty进行运输。网络连接长时间运行并存在于TaskManagers之间,而不是单个任务。
一旦TM2接收到缓冲区,它就会通过一个类似的对象层次结构,从InputChannel(接收方等效于IRPQ)开始,进入InputGate(包含几个IC),最后进入RecordDeserializer,从缓冲区生成类型化记录并将它们交给接收任务,在本例中为ReduceDriver。
原文链接:https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks