原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/14728638.html

一、Kafka二、解决问题异步处理应用解耦流量削峰三、特性读写效率网络传输并发能力持久化能力可靠性水平扩展四、基本概念消息&批次消息批次主题&分区日志Log基本概念Log保存与压缩日志保存日志压缩Broker副本生产者消费者消费者组消息传递模式Kafka架构概图五、核心特性详解消费者单消费者组多消费者组心跳机制再平衡机制再平衡触发条件避免再平衡消费者判“死”条件消费者没有定期地向Coordinator发送心跳请求规定时间内没有消费完poll方法返回的消息避免消费者被判“死”避免被“条件1”判死避免被“条件2”判死位移管理位移主题引入原因消息格式位移提交自动提交手动提交分区副本机制副本机制的优点副本定义副本角色同分区多副本,如何保证副本消息一致性?追随者副本不对外提供服务的原因同步副本(ISR)与非同步副本(OSR)同步副本的标准HWLEOLeader选举少部分副本宕机全部副本宕机为什么不少数服从多数?物理存储存储概述基本概念文件类别日志存储索引消息压缩偏移量索引六、参考七、总结

一、Kafka

Kafka是一个分布式的消息系统。

二、解决问题

消息系统通常被应用于异步处理、应用解耦、流量削峰、消息通信等场景。

异步处理

生产者将消息写入消息队列中,消费者异步拉取消息队列消息,从而提升消息处理能力。

应用解耦

Kafka作为消息传递的媒介,各子系统只需要做系统责任内的事情。生产者-消费者模式,Kafka就是消息队列。

流量削峰

正常情况下,上游服务(如报价、营销等)常年流量较大,面对大流量时能够较为从容地应对,但下游应用(如:交易、订单等)由于常年流量较小,面对大流量时会因为准备不足,而导致系统被打垮,引发雪崩。

为了应对这一问题,可以利用消息队列作为临时数据存储节点,消费者根据自身消费能力,通过拉取的方式控制消费速度,达到流量削峰的目的。

三、特性

读写效率

Kafka在面对大流量数据时,能够高效地处理消息的存储与查询。通过软件设计避免硬件读取磁盘的性能瓶颈。

网络传输

批量读取消息,对消息进行批量压缩,从而提升网络利用率。

并发能力

Kafka支持消息分区,每个分区内保证消息的顺序性,多分区之间能够支持并发操作,提升Kafka并发操作。

持久化能力

Kafka将消息持久化至硬盘。网络传输不可靠,所以需要将数据进行持久化。其中利用了零拷贝、顺序读、顺序写、页缓存等技术使Kafa具备高吞吐特性。

可靠性

支持分区多副本,Leader副本负责读写,Follow副本只负责同步Leader副本数据,实现消息冗余备份,提升Kafka容灾能力。

水平扩展

多Producer、Broker、Consumer,均为分布式,多Consumer可以加入同一Consumer Group,每个分区只能分配一个Consumer,当Kafka服务端增加分区数量进行水平扩展时,可以向Consumer Group添加Consumer,提升消费能力。当Consumer Group中有Consumer出现故障下线时,能通过再平衡(Rebalance)对分区进行再分配。

四、基本概念

消息&批次

消息

(1)消息是Kafka的基本单位;

(2)消息由key和value的byte数组构成;

(3)key能够根据策略将消息发送到指定分区。

批次

(1)为了提升效率,消息被分批写入kafka,同一组消息必须属于同一主题的同一分区;

(2)分批发送能够降低网络开销,提升传输速度。

主题&分区

主题(Topic)是用于存储消息分类关系的逻辑单元,可以看做存储消息的集合。分区(partition)是Kafka数据存储的基本单元,可以看做存储消息的集合的子集。Kafka消息通过主题进行分类,同一Topic的不同分区(partition)会分配在不用的Broker上,分区机制提供横向扩展的基础,可以通过增加并在其上分配partition来提升Kafka的消息并行处理能力。

日志

Log基本概念

(1)分区逻辑上对应一个Log,生产者将消息写入分区实际是写入分区对应的Log;

(2)Log可以对应磁盘上的文件夹,其由多个Segment组成,每个Segment对应一个日志文件和索引文件;

(3)当Segment大小超出限制时,就会创建新的Segment;

(4)Kafka采用顺序I/O,所以只会向最新的Segment追加数据;

(5)索引采用稀疏索引,运行时将其映射至内存中,提升索引速度。

Log保存与压缩

日志保存

(1)时间限制

根据保留时间,当消息在kafka中保存的时间超过指定时间,就会被删除。

(2)大小限制

根据Topic存储大小,当Topic所占日志的大小大于一个阈值,则可以开始删除最旧的消息。Kafka会启动一个新的线程,定期检查是否存在可以删除的消息。

日志压缩

很多场景中,Kafka消息的key与value值会不断变化,就像数据库中的数据会不断被修改,消费者只会关心最新的key对应的value。如果开启日志压缩功能,Kafka会开启线程,定时对相同key的消息进行合并,并保留最新的value值。

Broker

独立的Kafka服务就是一个broker,broker主要的工作就是接受生产者发送来的消息,分配offset并保存到磁盘中。Broker除了接受生产者发送的消息,还处理消费者、其他Broker的请求,根据请求类型进行相应处理行和响应返回。正常情况下一台机器对应一个broker。

副本

所谓副本就是对消息进程冗余备份,分布式系统在不同机器上相互保存对方数据。在Kafka中,每个分区(partition)可以有多个副本,每个副本中的消息是一样的(在同一时刻,多台机器之间的消息并不完全一致)。

生产者

生产者(Producer)的主要工作是生成消息。将消息发布根据规则推送到Topic的对应分区中。例如:(1)对key进行hash;(2)轮询;(3)自定义。

消费者

消费者(Consumer)的主要工作消费消息。从对应分区中拉取Topic的消息进行消费。消费者需要通过offset记录自己的消费位置。

消费者组

多个消费者(Consumer)构成消费者组(Consumer Group)。消费者组(Consumer Group)订阅的主题(Topic)的每个分区只能被分配给,在同一个消费者组中的一个消费者处理。但一个消费者可以消费同一主题(Topic)的多个分区。

消息传递模式

​ kafka没有消息推送,只有消息拉取。但消费者可以通过轮询拉取的方式实现消息推送功能。

Kafka架构概图

五、核心特性详解

消费者

(1)消费者从订阅的主题消费消息的偏移量保存至名字为"__consumer_offsets"的主题中;

(2)推荐使用Kafka来存储消费者偏移量,zookeeper不适合高并发。

单消费者组

多个消费同一主题的消费者只要将group_id设置相同,就可以组成消费者组。

情况一:一个消费者组中,只有一个消费者。

情况二:消费者组中有多个消费者。

情况三:分区数与消费者组数相同。

情况四:消费者组中消费者数量大于分区数。闲置的消费者不会接收消息。

多消费者组

一个主题对应多个消费者组,每个消费者组都能够消费该主题的所有消息。

心跳机制

Kafka的心跳机制保证Consumer和Broker之间的健康,当Broker Coordinator正常时,Consumer才会发送心跳。

再平衡机制

再平衡是规定消费者组下消费者与主题的分区之间发生变化时如何分配的协议。

再平衡触发条件

(1)消费组内消费者发生变化。(消费组数量变化,例如消费组宕机退出消费组)

(2)主题对应分区数发生变化。(kafka只支持增加分区)

(3)订阅主题发生变化。(消费组使用正则表达式订阅主题,此时恰好新建了对应主题)

情况一:正常情况,每个分区只能分配给一个消费者。

情况二:消费者机器宕机,消费者退出消费组,触发再平衡,重新给消费者组中的消费者分配分区。

情况三:Broker机器宕机,导致分区3无法提供服务。如果分区有副本则触发再平衡,如果没有副本则消费者3闲置。

情况四:使用正则表达式订阅主题,当新增主题时,主题对应的分区会分配给当前消费者,会触发再平衡。

避免再平衡

订阅主题数和主题分区数发生变化,一般情况下是运维主动触发,正常情况下不需要避免再平衡。所以我们可以重点关注由消费者组消费者数量变化而引发的重平衡。

在再平衡完成后,每个消费者实例会定时向Coodinator发送心跳请求。

消费者判“死”条件
消费者没有定期地向Coordinator发送心跳请求

(1)session.timeout.ms参数标识判定消费者死亡的时间阈值。参数默认值为10秒,即如果10秒内没有收到Group下的某Consumer实例的心跳请求,则被判定该Consumer实例“死亡”,移出Group。

(2)heartbeat.interval.ms参数标识心跳请求发送的频率。值越小,Consumer实例发送心跳请求的频率就越高。

规定时间内没有消费完poll方法返回的消息

(1)max.poll.interval.ms参数标识Consumer实例调用poll方法的最大时间间隔。默认值是5分钟,表示Comsumer如果在5分钟内无法消费完poll方法返回的消息,则会被移出Group。

避免消费者被判“死”
避免被“条件1”判死

session.timeout.ms >= 3 * heartbeat.interval.ms。保证Consumer被判死前至少经过3轮心跳请求。

例如:设置 session.timeout.ms = 6s;设置 heartbeat.interval.ms = 2s。

避免被“条件2”判死

尽可能将max.poll.interval.ms时间设置大一些。可以将消费者实例中的最长耗时作为依据,再此基础之上扩大1-1.5倍。为业务处理留下充足的处理时间,避免由于消息消费时间过长而导致再平衡。

位移管理

位移主题

Kafka中消费者根据消息的位移顺序消费消息,消费者的位移由消费者管理,可以存储在zookeeper中,可以存储于Kafka主题__consumer_offse中hjmgbknjk.n,jvgnvmnn/.vt。sconsumer_offsets就是位移主题。

引入原因

(1)老版本的位移管理依托Zookeeper,会自动或手动的方式将位移数据提交至Zookeeper进行保存。当Consumer重启后,它就能自动从Zookeeper中读取位移数据,从上次截止消费的地方继续消费。这种设计是的Kafka Broker不需要保存位移数据。

(2)但Zookeeper不适合高频写操作,所以在0.8.2.x版本后新版本的Consumer推出了全新的位移管理机制。将Consumer的位移数据作为一条普通的Kafka消息,提交到__consumer_offsets。

(3)正情况下不需要修改它,也不可以随意地向该主题写消息,因为这会导致Kafka无法正常解析。

消息格式

(1)Key中包含GroupID、主题名、分区号;

(2)Value中包含位移值。

位移提交

(1)Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 提交位移(Committing Offsets)

(2)Consumer 需要为分配给它的每个分区提交各自的位移数据

(3)位移提交的由Consumer端负责的,Kafka只负责保管。__consumer_offsets

(4)位移提交分为自动提交和手动提交

(5)位移提交分为同步提交和异步提交

自动提交

(1)设置enable.auto.commit值为true;

(2)通过auto.commit.interval.ms,可以设置自动提交的时间间隔,默认值为5秒;

(3)Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的位移信息。poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况;

(4)自动提交会出现消息重复消费。例:Consumer每 5s提交offset,当提交位移信息后3秒发生了再平衡,所有Consumer都会从上次提交的offset开始消费,但此时获取的offset已经是3秒前的offset了,所以我们又会重新消费再平衡前3秒的所有数据。我们只能够缩小提交offset的时间窗口,但无法避免重复消费。

手动提交

1、同步提交

(1)使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新offset;

(2)该方法为同步操作,等待直到 offset 被成功提交才返回;

1while (true) {
2            ConsumerRecords<String, String> records =
3                        consumer.poll(Duration.ofSeconds(1));
4            process(records); // 处理消息
5            try {
6                        consumer.commitSync();
7            } catch (CommitFailedException e) {
8                        handle(e); // 处理提交失败异常
9            }

(3)同步提交会使Consumer处于阻塞状态;

(4)同步提交在出现异常时会自动重试。

2、异步提交

(1)使用异步提交规避Consumer阻塞;

(2)异常(GC、网络抖动)时使用同步提交进行重试。

 1try {
2     while(true) {
3          ConsumerRecords<String, String> records = 
4                                    consumer.poll(Duration.ofSeconds(1));
5          process(records); // 处理消息
6          commitAysnc(); // 使用异步提交规避阻塞
7     }
8catch(Exception e) {
9            handle(e); // 处理异常
10finally {
11    try {
12        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
13    } finally {
14       consumer.close();
15    }

分区

分区(Partition)是Kafka数据的基本单元。同一个主题(topic)数据会被分散存储到多个partion中,这些分区可以被分配到同一台机器或不同机器上。优点是有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余,从而提升容灾能力。为了做到均匀分布,一般partition的数量一般是Broker Server数量的整数倍。

副本机制

副本机制的优点

分区拥有多个副本,提供冗余数据,有利于确保kafka的高可用性。

副本定义

(1)每个主题可以分为多个分区,每个分区下配置多个副本;

(2)副本的本质是一个只能追加写消息的提交日志;

(3)同一分区下的所有副本保存相同的消息序列;

(4)分区写的不同副本分散保存在不同Broker上,应对Broker宕机时分区数据不可用的情况。

副本角色

同分区多副本,如何保证副本消息一致性?

最常见的解决方案是基于领导者(Leader-based)的副本机制。

1、副本分为两类

(1)领导者副本;

(2)追随者副本。

2、在Kafka的副本机制与其他分布式系统不同

(1)在kafka中,追随者副本不对外提供服务。所有请求都必须由领导者副本来处理;

(2)追随者副本的唯一任务就是从领导者副本异步拉取消息,并写入自己的提交日志中,从而实现与领导者副本的同步。

3、领导者副本所处Broker宕机

(1)Kafka依托Zookeeper提供的监控功能能够感知Broker宕机,并开启一轮新的选举;

(2)老Leader副本重启后,只能作为追随者副本加入集群中。

追随者副本不对外提供服务的原因

1、方便实现“Read-your-writes”

(1)生产者使用API想Kafka写入消息成功后,能够立马使用消费者API查看到刚才生产的信息。

(2)如果允许追随者副本对外提供服务,由于追随者副本是异步的,因此就可能出现追随者副本没有从领导者副本拉取到最新消息的情况,就会出现无法立刻读到最新写入的消息。

2、方便实现单调读(Monotonic Reads)

(1)什么是单调读?对于消息消费者而言,消息不会时有时无。

(2)如果允许追随者副本对外提供服务,由于追随者副本是异步的,多个副本从领导者副本拉取的消息不一定同步,就会出现多次请求读取不同的追随者副本的情况,数据读取时有时无。如果读取全由领导者副本来处理,那么Kafka就很实现单调读一致性。

同步副本(ISR)与非同步副本(OSR)

由于追随者副本需要异步去拉取领导者副本,那么我们就需要确定再怎么样才算与领导者副本同步。

Kafka引入了In-Sync Replicas,也就是ISR(同步)副本集合,该副本在Zookeeper上维护。如果存在于ISR中则意味着与领导者副本同步,相反则为非同步副本(OSR)

同步副本的标准

(1)replica.lag.time.max.ms参数值标识Follower副本能够慢于Leader副本的最长时间间隔,默认值为10秒。

(2)若Follower副本落后于Leader副本的最长连续时间间隔不超过该replica.lag.time.max.ms参数值设定的大小,则认定该Follower副本与Leader副本是同步的,否则认定为非同步,会将副本从ISR副本集合中移出(Follower副本的拉取速度慢于Leader副本写入消息的速度,且时间间隔超过设定阈值)。

(3)ISR是动态调整集合,非静态不变的。当Follower副本追上进度时,就会重新被添加会ISR集合。

HW

高水位(HW是High Watermark的缩写),表示一个特定消息的偏移量,消费者只能拉取到这个offset之前的数据。

LEO

LEO是Log End Offset的缩写,表示当前日志文件下一条待写入消息的offset

Leader选举

少部分副本宕机

(1)当Leader副本对应的broker宕机后,就会从Follower副本中选择一个副本作为Leader;

(2)当宕机的broker恢复后就会重新从leader中pull数据。

全部副本宕机

unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。

(1)不开启Unclean。等待ISR中的一个恢复,并选择其当leader;(等待时间较长,可用性降低)

(2)开启Unclean。选择第一个恢复的副本作为新的leader,无论是否是ISR副本。(开启会造成数据丢失)

(3)正常情况下建议不开启,虽然牺牲了高可用性,但维护了数据一致性,避免消息丢失。

为什么不少数服从多数?

选择Leader副本时如果需要超过半数的同步副本同意,算法所需的冗余同步副本较多。(一台机器失败,就需要3个同步副本)

物理存储

存储概述

基本概念

(1)Kafka使用日志文件保存生产者发送的消息;

(2)每条消息都有一个offset值表示它在分区中的偏移量;

(3)offset值是逻辑值并不是真实存在的物理地址。其类似于数据库中的主键,唯一标识了数据库表中的一条数据。而offset在Kafka中的某个分区唯一标识一条消息。

(4)Log与分区一一对应,Log并不是一个文件而是一个文件夹;

(5)文件夹以topicName_pratiitonID命名,分区消息全部都存储在次文件夹下的日志文件中;

(6)Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是逻辑概念,对应磁盘上的Log目录下的一个日志文件和索引文件;

(7)日志文件的命名规则是[baseOffset].log,baseOffset是日志文件中第一条消息的offset;

(8)Kafka日志是顺序追加的;

(9)每个日志文件都对应一个索引文件,索引文件使用稀疏索引的方式为文件日志中部分消息建立索引。

(10)日志文件结构图

(11)Log示例

创建了一个tp_demo_01的主题,其中存在6个partition,对应的每个partition下存在一个Topic-partition命名的消息日志。

(12)LogSegment示例

文件类别

日志存储

索引

为了提升消息查找的速度,Kafka从0.8版本开始,为每个日志文件添加对应的索引文件。IndexFile与MassageSet File共同组成了LogSegment。

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。

索引文件中的索引项的格式:每个索引项为8字节,分为两部分,第一部分是相对offset(4字节),即相对于baseOffset的偏移量(baseOffset就是基准偏移量,日志文件的命名以基准偏移量来命名)。第二部分是物理地址(4字节),即其索引消息在日志文件中对应的position位置。通过这两部分就能实现offset与物理地址之间的映射。

消息压缩
偏移量索引

举例

假设需要查找startOffset为1067。需要将offset=1067转换成对应的物理地址,其过程是怎样的?

(1)将绝对offset转化为相对offset,绝对offset减去baseOffset,得到相对offset=67;

(2)通过相对offset查找索引文件,得到(58,1632)索引项(通过跳表的方式定位到某一个index文件,再通过二分法找到不大于相对offset的最大索引项);

(3)从position为1632处开始顺序查找,找到绝对offset=1067的消息;

(4)最终会得到offset为1070的位置消息。(因为消息被压缩,offset=1067这条消息被压缩后一起构成offset=1070这条外层消息)。

六、参考

《Apache Kafka源码剖析》

《极客时间-Kafka核心技术与实战》

《拉钩Java-Kafka》

七、总结

本文从场景、特性、基本概念、核心特性等多个维度较为详细地阐述了Kafka的相关知识。关于kafka稳定性与具体源码实现会在进阶篇中阐述。

懂得不多,做得太少。欢迎批评与指正!

原创文章,转载请标注。https://www.cnblogs.com/boycelee/p/14728638.html

05-04 08:23