大数据学习与分享

大数据学习与分享

通过之前的文章《Kafka分区分配策略》《Kafka高性能揭秘》,我们了解到:Kafka高吞吐量的原因之一就是通过partition将topic中的消息保存到Kafka集群中不同的broker中。无论是Kafka的producer,还是consumer都可以并发操作topic中的partition,因此partition是Kafka并行度调优的最小单元。

理论上说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。

但是,实际生产中Kafka topic的分区数真的配置越多越好吗?很显然不是!

分区数过多会有以下弊端:

一、客户端/服务器端需要使用的内存就越多

Kafka0.8.2之后,在客户端producer有个参数batch.size,默认是16KB。它会为每个分区缓存消息,在数据积累到一定大小或者足够的时间时,积累的消息将会从缓存中移除并发往broker节点。这个功能是为了提高性能而设计,但是随着分区数增多,这部分缓存所需的内存占用也会更多。

与此同时,consumer端在消费消息时的内存占用、以及为达到更高的吞吐性能开启的consumer线程数也会随着分区数增加而增加。比如有10000个分区,同时consumer线程数要匹配分区数(大部分情况下是最佳的消费吞吐量配置)的话,那么在consumer client就要创建10000个线程,那么在consumer client就要创建10000个线程,也需要创建大约10000个Socket去获取分区数据。线程的开销成本很显然是不容小觑的!

此外,服务器端的开销也不小,如果阅读Kafka源码的话可以发现,服务器端的很多组件都在内存中维护了分区级别的缓存,比如controller,FetcherManager等,因此分区数越多,这种缓存的成本就越大。

二、文件句柄的开销

在Kafka的broker中,每个partition都会对应磁盘文件系统的一个目录。在Kafka的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。当前版本的kafka,每个broker会为每个日志段文件打开一个index文件句柄和一个数据文件句柄。因此,随着partition的增多,所需要保持打开状态的文件句柄数也就越多,最终可能超过底层操作系统配置的文件句柄数量限制。

三、越多的分区可能增加端对端的延迟

Kafka端对端延迟定义为producer端发布消息到consumer端接收消息所需要的时间,即consumer接收消息的时间减去producer发布消息的时间。

Kafka只有在消息提交之后,才会将消息暴露给消费者。例如,消息在所有in-sync副本列表同步复制完成之后才暴露。因此,in-sync副本复制所花时间将是kafka端对端延迟的最主要部分。在默认情况下,每个broker从其他broker节点进行数据副本复制时,该broker节点只会为此工作分配一个线程,该线程需要完成该broker所有partition数据的复制。

注意,上述问题可以通过增大kafka集群来进行缓解。例如,将1000个分区leader放到一个broker节点和放到10个broker节点,他们之间的延迟是存在差异的。在10个broker节点的集群中,每个broker节点平均需要处理100个分区的数据复制。此时,端对端的延迟将会从原来的数十毫秒变为仅仅需要几毫秒。

根据经验,如果你十分关心消息延迟问题,限制每个broker节点的partition数量是一个很好的主意:对于b个broker节点和复制因子为r的kafka集群,整个kafka集群的partition数量最好不超过100*b*r个,即单个partition的leader数量不超过100。

四、降低高可用性

Kafka通过多副本复制技术,实现Kafka集群的高可用和稳定性。每个partition都会有多个数据副本,每个副本分别存在于不同的broker。所有的数据副本中,有一个数据副本为leader,其他的数据副本为follower。

在Kafka集群内部,所有的数据副本皆采用自动化的方式进行管理,并且确保所有的数据副本的数据皆保持同步状态。不论是producer端还是consumer端发往partition的请求,都通过leader数据副本所在的broker进行处理。当broker发生故障时,对于leader数据副本在该broker的所有partition将会变得暂时不可用。Kafka将会自动在其它数据副本中选择出一个leader,用于接收客户端的请求。这个过程由Kafka controller节点broker自动完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息。

在通常情况下,当一个broker有计划地停止服务时,那么controller会在服务停止之前,将该broker上的所有leader一个个地移走。由于单个leader的移动时间大约只需要花费几毫秒,因此从客户层面看,有计划的服务停机只会导致系统在很小时间窗口中不可用。(注:在有计划地停机时,系统每一个时间窗口只会转移一个leader,其他leader皆处于可用状态。)

然而,当broker非计划地停止服务时(例如,kill -9方式),系统的不可用时间窗口将会与受影响的partition数量有关。假如,一个2节点的kafka集群中存在2000个partition,每个partition拥有2个数据副本。当其中一个broker非计划地宕机,所有1000个partition同时变得不可用。假设每一个partition恢复时间是5ms,那么1000个partition的恢复时间将会花费5秒钟。因此,在这种情况下,用户将会观察到系统存在5秒钟的不可用时间窗口。

而如果发生宕机的broker恰好是controller节点时:在这种情况下,新leader节点的选举过程在controller节点恢复到新的broker之前不会启动。controller节点的错误恢复将会自动地进行,但是新的controller节点需要从zookeeper中读取每一个partition的元数据信息用于初始化数据。例如,假设一个Kafka集群存在10000个partition,从zookeeper中恢复元数据时每个partition大约花费2ms,则controller的恢复将会增加约20秒的不可用时间窗口。

总而言之,通常情况下Kafka集群中越多的partition会带来越高的吞吐量。但是,如果Kafka集群中partition总量过大或者单个broker节点partition过多,都可能会对系统的可用性和消息延迟带来潜在的负面影响,需要引起我们的重视。

那么如何确定合理的分区数量呢?

在partition级别上达到均衡负载是实现吞吐量的关键,合适的partition数量可以达到高度并行读写和负载均衡的目的,需要根据每个分区的生产者和消费者的目标吞吐量进行估计。

可以遵循一定的步骤来确定分区数:根据某个topic日常"接收"的数据量等经验确定分区的初始值,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么numPartitions = Tt / max(Tp, Tc)

说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用消费消息后进行什么处理的关系更大,相对复杂一些。

推荐文章:

Kafka作为消息系统的系统补充

分布式流平台Kafka

Hadoop支持的压缩格式对比和应用场景以及Hadoop native库

Spark之离线统计热点城市信息

Spark SQL中Not in Subquery为何低效以及如何规避

面试技术点记录


关注微信公众号:大数据学习与分享,获取更对技术干货

02-03 20:28