今天又有小伙伴在群里问 slot 和 kafka topic 分区(以下topic,默认为 kafka 的 topic )的关系,大概回答了一下,这里整理一份
首先必须明确的是,Flink Task Manager 的 slot 数 和 topic 的分区数是没有直接关系的,而这个问题其实是问的是: 最大并发数与 slot 数的关系
最大并发数 = slot 数
每个算子的不同并行不能在同一slot,而不同的算子可以共享 slot ,所以最大并行度 就等于 slot 数。
这样就有了slot数和topic 分区数的间接关系在:我们可能会根据 kafka 的分区数配置我们 source (和后续的其他算子)算子的 并行度,而算子的 最大并行度决定 slot 数据(TM 的数量由 slot 的数量反向计算)
看一张官网的图:
说明:
第一个图:3 个 Task Manager,每个 3 个 slot,总共 9 个 slot
第二个图:Example 1 ,wordcount 案例,1 个并发,算子chain 在一起,只占一个 slot
第三个图:Examlple 2,wordcount 案例,2 个并发,占2 slot 。三种设置并行度的方式:
flink-conf.yaml 参数 parallelism.default: 2 flink -p 2 # 启动时加 -p 参数指定 env.setParallelism(2)
第四个图:Example 3,wordcount 案例,9 个并发,占 9 slot
第五个图:Example 3,wordcount 案例,source 9 个并发,sink 1 个并发,占 9 个slot(sink 和其中一个 source chain 在一起了)
注:算子 chain 在一起的条件,见文末
看一个具体的任务:
我们要读的 topic 有 2 个 partition,我们设置 source 算子的并行度为 2,那我们最小就需要 4 个 slot,Task Manager 配置的 slot 数为2, 那最少就需要 2 个 TM 任务才能正常运行(不考虑其他算子)。
关键代码:
env.setParallelism(2) env.addSource(source).addSink(sink)
提交到yarn 上
上面说明了算子的并发度与TM 的 slot 数的关系。
下面看下,kafka 分区数与 source 算子的并行度关系。
在不修改 kafka consumer 的分区分配策略的情况下,soure 的并行度与 topic 分区数在不同情况下,会有不同的表现,如下:
1、source 并行度 = topic 分区数,正好的情况,一个 并行度,读一个分区的数据
2、source 并行读 < topic 分区数, 会出现部分 并行度读多个 分区的情况,具体可见:flink 读取kafka 数据,partition分配
3、source 并行度 > topic 分区数,会出现部分并行度没有数据的情况
总结下问题:slot 数和 topic 的分区数并没有直接关系,以kafka 做 source 的情况最多,而 kafka topic 的分区数一般又是 Flink source 的并行度,又是 Flink 任务的最大并发度,一般情况下又是 slot 的数量,所以会有一种 slot 数 和 topic 分区数 有直接关系的假象。
注:Task Manager 的 slot 数在 flink-conf.yaml 中配置 参数:
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 2 # 默认值为1
官网 slot 配置说明:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots (slot 数量推荐是在只有一个任务的情况下,具体配置要看实际情况)
算子可以chain 在一起的条件: --- 没找到,找到之后补上