Kafka使用zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件指定,也可以自动生成。

在broker停机,出现网络分区或者长时间垃圾回收停顿时,broker会从zookeeper上断开连接,此时broker在启动时创建的临时节点会自动从zookeeper上移除。监听broker列表的Kafka组件会被告知该broker已移除。

在完全关闭一个broker之后,如果使用相同的ID启动一个全新的broker,它会立即加入集群,并拥有与旧broker相同的分区和主题。

Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本,副本被保存在broker上,每个broker都可以保存上千个属于不同主题和分区的副本。

副本有两种类型:

首领副本,每个分区都有一个首领副本,为了保证一致性,所以生产者和消费者请求都会经过这个副本。

跟随者副本,首领以外的副本都是跟随副本,跟随副本不处理来自客户端的请求,它们唯一的任务时从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中一个跟随者就会被提升为首领。

首领的另一个任务时搞清楚哪个跟随者的状态与自己时一致的,如果跟随者10s内没有请求任何消息,或者10s内没有请求最新的数据,它们会被认为是不同步的,如果一个副本无法与首领保持一致,首领失效时他就不能称为新的首领,毕竟他没有包含全部的消息。

除了当前首领外,每个分区都有一个首选首领,——创建主题时选的首领就是分区的首选首领,之所以叫做首选首领,是因为创建分区时,需要在broker之间均衡首领,因此我们希望首选首领在称为真正的首领时,broker间的负载最终会得到平衡。默认情况下Kafka的auto.leader.rebalance.enable=true,它会检查首选首领是不是当前首领,如果不是,并且该副本时同步的,那就会触发首领选举,让首选首领成为当前首领。

broker的大部分工作时处理客户端,分区副本和控制器发送给分区首领的请求。Kafka提供了一个二机制协议(基于tcp),指定了请求消息的格式以及broker如何对请求作出响应——包括成功处理请求或在处理请求中遇到错误。客户端发起连接并发送请求,broker处理请求并作出响应。broker按照请求到达的顺序处理它们——这种顺序保证了Kafka具有了消息队列的特性,同时保证保存的消息时有序的。

所有的请求消息包含一个标准消息头:

1.request type

2.request version

3.correlation ID——一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志中。

4.client ID——用于标识发送的请求的客户端

broker会在它监听的每一个端口上运行一个acceptor,这个线程会创建一个连接,并把它交给processor线程去处理。processor线程(网络线程)的数量时可配置的,网络线程负责从客户端获取请求消息,把他们放进请求队列,然后从响应队列获取响应消息,把他们发送给客户端。

请求消息被放到请求队列后,IO线程会负责处理它们,下面是几种最常见的请求类型:

1.生产请求:生产者发送的请求,它包含客户端要写入broker的消息

2.获取请求:在消费者和跟随者副本需要从broker读取消息时发送的请求

生产请求和获取请求都必须发送给分区的首领副本,如果broker收到一个针对特定分区的请求,而该分区的首领在另一个broker上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。Kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上。

那么客户端怎么知道往哪里发送请求呢?

客户端使用了另一种请求类型,也就是元数据请求,这种请求包含了客户端感兴趣的主题列表。服务器端响应消息里指明了这些主题所包含的分区,每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个broker,因为所有broker都缓存了这些信息。

一般情况下,客户端会把这些信息缓存起来,并直接往目标broker上发送生产和获取请求。它们需要时不时地发送元数据请求来刷新这些信息(刷新时间间隔通过meta.max.age.ms配置)从而知道元数据是否发生了变更——比如在新的broker加入集群时,部分副本被移动到新broker上。另外如果客户端收到非首领错误,它会尝试重发请求之前先刷新元数据,因为这个错误说明客户端正在使用过期的元数据信息。

5.4.1生产请求

Acks配置参数——该参数指定了需要多少个broker确认才可以任务一个消息写入时成功的。不同的配置对“写入成功”的界定不同;

如果acks=1,那么只要首领收到消息就认为写入成功;

如果acks=all,那么需要所有的同步副本收到消息才算写入成功;

如果acks=0,那么生产者把消息发送之后,完全不需要等待broker的响应。

包含首领副本的broker在收到生产请求时,会对请求做一些验证。

1.发送数据的用户是否有主题写入权限?

2.请求里包含的acks值是否有效(只能出现0,1或all)

3.如果acks=all,是否有足够多同步副本保证消息已经被安全写入?(如果同步副本不足,broker可以拒绝处理新消息。)

之后消息被写入本地磁盘。在Linux系统上,消息会被写到文件系统缓存里,并不保证何时会被刷新到磁盘上,Kafka不会一直等待数据被写到磁盘上——它依赖复制功能来保证消息的持久性。

在消息被写入分区首领之后,broker开始检查acks配置参数——如果acks被设为0或1,那么broker立即返回响应;如果acks被设为all,那么请求会被保存在一个叫做炼狱的缓冲区,直到首领发现所有跟随者副本都复制了消息,响应才会返回给客户端。

5.4.2获取请求

broker处理获取请求的方式与处理生产请求相似。客户端发送请求,想broker请求主题分区具有特定偏移量的消息。客户端可以指定broker最多从一个分区返回多少数据。这个限定非常重要,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的大量数据可能耗尽客户端内存。如果请求的偏移量存在,broker将按照客户端指定的数量上限从分区读取消息,再把消息返回客户端。Kafka使用零复制技术想客户端发送消息——也就是说Kafka直接把消息发送到网络通道,不需要经过任何中间缓冲区。这种技术避免了字节复制,也不需要管理内存缓存区,从而获得更好性能。

客户端除了可以设置返回数据的上限,也可以设置下限。在主题消息流量不是很大的情况下,这样可以减少CPU和网络开销。客户端发送一个请求,broker有足够数据才把它们返回客户端。当然客户端可以设定一个超时时间,当到达时间后,即便broker没有足够数据,也会发送到消费者。

并不时保存在分区首领上的数据都能被客户端读取,大部分客户端只能读取已经被写入所有同步副本的消息,分区首领知道每个消息会被复制到哪个副本上,在消息被写入所有同步副本之前,是不会发给消费者的——尝试获取这些消息的请求会得到空的响应而不是错误。

这意味着如果broker间的消息复制变慢,那么消息到达消费者的时间也会变长,延迟时间可以通过replica.lag.time.max.ms来配置,它指定来副本在复制消息时可被允许的最大延迟时间。

之前的Kafka消费者使用zookeeper跟踪偏移量,之后决定把偏移量保存在特定的Kafka主题上。为了达到这个目的,我们不得不往协议里增加几种请求类型在:offsetCommitRequest,offsetFetchRequest和listOffsetRequest,现在应用程序调用commitOffset方法时客户端不再把偏移量写入zookeeper,而是向Kafka发送offsetCommitRequest请求。

主题的创建仍然需要通过命令行来完成,命令行工具会直接更新zookeeper里的主题列表,broker监听这些主题列表,在有新主题加入时,它们会收到通知。我们正在改进Kafka,增加createTopicRequest,这样客户端就可直接向broker请求创建新主题了。

我们在0.10.0版本增加了APIVersionRequest,客户端可以循环broker支持哪些版本的请求,然后使用正确的版本与broker通信。

5.5物理存储

Kafka的基本存储单元时分区,分区无法在多个broker间在细分。

在创建主题时,Kafka首选会决定如何在broker间分配分区,我们要达到如下目标:

1.在broker间平均分配分区副本,

2.确保每个分区的每个副本分布在不同的broker上

3.如果broker指定了机架信息,那么尽可能把每个分区分配到不同机架的broker上。

为分区和副本选好合适的broker之后,接下来要觉得这些分区应该使用哪个目录。我们单独为每个分区分配目录,规则:计算每个目录的分区数量,新的分区总是被添加到数量最小的目录里。

5.5.2文件管理

保留数据时Kafka的一个基本特性,Kafka不会一直保留数据,也不会等到所有消费者都读取了消息之后才能删除消息。相反,Kafka管理员为每个主题配置了数据保留期限,规定了数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。

因为在一个大文件里查找和删除消息时很费时的,也容易出错,所以把分区分成若干个片段,默认情况下,每个片段包含1G或一周的数据,比较小的为准,在broker往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。

当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。

5.5.3文件格式

我们把Kafka消息和偏移量保存在文件里,保存在磁盘上的数据格式与从生产者发送过来货发送给消费者的消息格式时一样的。因为使用了相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术给消费者发送消息,同时避免了对生产者压缩过的消息进行解压和再压缩。

除了键值和偏移量外,消息还包含了消息大小,校验和,消息格式版本号,压缩算法和时间戳。

时间戳可以时生产者发送消息的时间,也可以是消息到达broker的时间,这个可以配置。

Kafka附带了一个叫做DumpLogSegment的工具,可以查看片段内容,显示每个消息的偏移量,校验和,魔术数字节,消息大小和压缩算法。

5.5.4索引

为了帮助broker更快定位到指定偏移量,Kafka为每个分区维护了一个索引。索引把偏移量映射到片段文件和偏移量在文件的位置。

5.5.5清理

Kafka通过改变主题的保留策略来满足这些场景(只关心最新的数据),早于保留时间的旧事件会被删除,为每个键保留最新值,只有当应用程序的事件里包含键值对时,为这些主题设置compact策略才有意义。

如果Kafka启动时启动了清理功能(通过设置log.cleaner.enabled)每个broker会启动一个清理管理器线程和多个清理线程,它们负责执行清理任务,这些线程会选择污浊率较高的分区进行清理。

5.5.7被删除的事件

如果为了把一个键从系统删除,应用程序必须发送一个包含该键且值为null的消息。清理线程发现该消息时,会先进行常规清理,只保留值为null的消息,如果消费者往数据库复制Kafka数据,当看到这个消息时,就知道要把相关信息从数据库里删除。过一段时间后,清理线程就会移除这个消息。键也会从Kafka消失。

5.5.8何时会清理主题

就像delete策略不删除当前活跃片段一样,compact策略也不会对当前片段进行清理,只有旧片段的消息才会被清理。Kafka会在包含脏记录的主题达到50%时进行清理。

05-24 08:40