一、生产者客户端的整体架构

深入理解kafka核心设计跟实践原理之生产者-LMLPHP

整个生产者生产者客户端由2个线程协调运行,这2个线程分别为主线程跟Sender线程(发送线程)。在主线程中由kafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator),也称为消息收集器中,Sender线程负责从RecordAccumulator中获取消息并将其发送至kafka中。

接下来,说几个其中重要的参数配置:

1、buffer.memory:RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。此参数便是RecordAccumulator缓存的大小,默认33554432B,即32MB。

2、max.block.mx:如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候kafkaProducer的send方法调用要么被阻塞,要么抛出异常,此参数为最大阻塞时间,默认值60000,即60秒

3、batch.size:消息在网络中都是以字节的形式进行传输的,在发送之前需要创建一块内存区域来保存对应的消息,为了避免java.io.ByteBuffer频繁的创建与释放耗费资源,在RecordAccumulator内部还有个BufferPool,实现byteBuffer的复用,默认16384B,即16KB,可以适当调整,缓存更多消息。流程如下:当一条消息流入RecordAccumulator时,会先寻找与消息分区对应的双端队列(没有则新建),在从这个双端队列的尾部获取一个ProducerBatch(没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则新建一个ProducerBatch。在新建时评估这条消息的大小是否超过batch.size参数的值,如果不超过,那么以batch.size参数的大小创建ProducerBatch,这样使用完这个ProducerBatch之后,还可以复用;如果超过,则以消息的大小创建ProducerBatch,这段内存区域则不会复用

4、max.in.flight.requests.per.connection:请求在从Sender线程发往kafka之前还会保存到InFlightRequests中,InFlightRequests存对象的具体形式为Map<NodeId,Deque<Request>>,它的主要作用是缓存了已经发出去但还没有收到响应的请求。此参数即为:每个连接(也就是客户端与node之间的连接)最多缓存的请求数,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数字之后,会阻塞到收到响应,直到请求超时。

参考:《深入理解kafka核心设计与实践原理》

03-28 04:57