kafka数据可靠传输
再说复制
Kafka 的复制机制和分区的多副本架构是Kafka 可靠性保证的核心。把消息写入多个副本可以使Kafka 在发生崩愤时仍能保证消息的持久性。
Kafka 的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka 可以保证分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用) 。每个分区可以有多个副本,其中一个副本是首领。所有的事件都直接发送给首领副本,或者直接从首领副本读取事件。其他副本只需要与首领保持同步,并及时复制最新的事件。当首领副本不可用时,其中一个同步副本将成为新首领。
分区首领是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步的。
- 与Zookeeper 之间有一个活跃的会话,也就是说,它在过去的6s(可配置)内向Zookeeper 发送过心跳。
- 在过去的10s 内(可配置)从首领那里获取过消息。
- 在过去的10s 内从首领那里获取过最新的消息。光从首领那里获取消息是不够的,它还必须是儿乎零延迟的。
如果跟随者副本不能满足以上任何一点,比如与Zookeeper 断开连接,或者不再获取新消息,或者获取消息滞后了10s 以上,那么它就被认为是不同步的。一个不同步的副本通过与Zookeeper 重新建立连接,并从首领那里获取最新消息,可以重新变成同步的。这个过程在网络出现临时问题并很快得到修复的情况下会很快完成,但如果broker 发生崩愤就需要较长的时间。
broker的配置
broker 有3 个配置参数会影响Kafka 消息存储的可靠性。与其他配置参数一样,它们可以应用在broker 级别,用于控制所有主题的行为,也可以应用在主题级别,用于控制个别主题的行为。在主题级别控制可靠性,意味着Kafka 集群可以同时拥有可靠的主题和非可靠的主题。例如,在银行里,管理员可能把整个集群设置为可靠的,但把其中的一个主题设置为非可靠的,用于保存来自客户的投诉,因为这些消息是允许丢失的。
复制系数
主题级别的配置参数是replication.factor,而在broker 级别则可以通过default.replication.factor来配置自动创建的主题。
如果复制系数为N,那么在凡l 个broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的复制系数会带来更高的可用性、可靠性和更少的故障。另一方面,复制系数N 需要至少N 个broker ,而且会有N 个数据副本,也就是说它们会占用N倍的磁盘空间。我们一般会在可用性和存储硬件之间作出权衡。
建议在可用性的场景下,把复制系数设置为3.
不完全的首领选举
之前提到过,当分区首领不可用时,一个同步副本会被选举为新首领。如果在选举过程中没有丢失数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是“完全的”
但如果在首领不可用时其他副本都是不同步的,我们该怎么办呢?
#以下两种情况:
第一种: 分区有3 个副本,其中的两个跟随者副本不可用(比如有两个broker 发生崩愤)。这个时候,如果生产者继续往首领写入数据,所有消息都会得到确认井被提交(因为此时首
领是唯一的同步副本)。现在我们假设首领也不可用了(又一个broker 发生崩愤),这个时候,如果之前的一个跟随者重新启动,它就成为了分区的唯一不同步副本。
第二种: 分区有3 个副本,因为网络问题导致两个跟随者副本复制消息滞后,所以尽管它复制消息,但已经不同步了。首领作为唯一的同步副本继续接收消息。这个时候,如果
首领变为不可用,另外两个副本就再也无法变成同步的了。
#上面这两种情况该如何解决?
第一种:要么等待原首领复活,但是等待过程中服务是宕的,有可能这是一个很长的时间段;要么使用新的首领,但是肯定丢失了数据。
第二种:因为两个副本已经滞后了,所以若首领不可用,那么滞后的同步副本被选为新首领,就会造成数据丢失的问题(数据不一致)。
简而言之,如果我们允许不同步的副本成为首领,那么就要承担丢失数据和出现数据不一致的风险。如果不允许它们成为首领,那么就要接受较低的可用性,因为我们必须等待原先的首领恢复到可用状态。
如果把unclean.leader.election.enable 设为true ,就是允许不同步的副本成为首领(也就是“ 不完全的选举"),那么我们将面临丢失消息的风险。如果把这个参数设为false ,
就要等待原先的首领重新上线,从而降低了可用性。我们经常看到一些对数据质量和数据一致性要求较高的系统会禁用这种不完全的首领选举( 把这个参数设为false ) 。银行系统是这方面最好的例子,大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支付事务,也不会冒险处理错误的消息。不过在对可用性要求较高的系统里,比如实时点击流分析系统, 一般会启用不完全的首领选举。
最少同步副本
在主题级别和broker 级别上,这个参数都叫min.insync.replicas 。我们知道,尽管为一个主题配置了3 个副本,还是会出现只有一个同步副本的情况。如果这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择一一这是一个两难的选择。根据Kafka 对可靠性保证的定义,消息只有在被写入到所有同步副本之后才被认为
是已提交的。但如果这里的“所有副本”只包含一个同步副本,那么在这个副本变为不可用时,数据就会丢失。
如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点的值。对于一个包含3 个副本的主题,如果min.insync.replicas被设为2 ,那么至少要存在两个同步副本才能向分区写入数据。
如果3 个副本都是同步的,或者其中一个副本变为不可用,都不会有什么问题。不过,如果有两个副本变为不可用,那么broker 就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException 异常。消费者仍然可以继续读取已有的数据。实际上,如果使用这样的配置,那么当只剩下一个同步副本时,它就变成只读了,这是为了避免在发生不完全选举时数据的写入和读取出现非预期的行为。为了从只读状态中恢复,必须让两个不可用分区中的一个重新变为可用的(比如重启broker),并等待它变为同步的。
配置重试参数
如下的一个实例:
为broker 配置了3 个副本,并且禁用了不完全首领选举。 把生产者的acks 设为all 。假设现在往Kafka 发送消息,分区的首领刚好崩愤,新的首领正在选举当中, Kafka 会向生产者
返回“首领不可用”的响应。在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。这算不上是broker 的可靠性问题,因为broker
并没有收到这个消息。这也不是一致性问题,因为消费者井没有读到这个消息。问题在于如果生产者没能正确处理这些错误,弄丢消息的是它们自己。
#解决这个问题,生产者再发一次消息就可以了!
生产者需要处理的错误包括两部分: 一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果broker 返回的错误可以通过重试来解决,那么生产者会自动处理这些错误。生产者向broker 发送消息时, broker 可以返回一个成功晌应码或者一个错误响应码。错民晌应码可以分为两种, 一种是在重试之后可以解决的,还有一种是无法通过重试解决的。例如,如果broker 返回的是LEADER_NOT_AVAILABLE 错误,生产者可以尝试重新发送消息。也许在这个时候一个新的首领被选举出来了,那么这次发送就会成功。也就是说, LEADER_NOT_AVAILABLE是一个可重试错误。另一方面,如果broker 返回的是INVALID_CONFIG 错误,即使通过重试也无能改变配置选项,所以这样的重试是没有意义的。这种错误是不可重试错误。
上面提到的是生成者的一些配置,下面我们来说明消费者的一些配置。
消费者读取数据
下面这段着重理解一下:
只有那些被提交到Kafka 的数据,也就是那些已经被写入所有同步副本的数据,对消费者是可用的,这意味着消费者得到的消息已经具备了一致性。消费者唯一要做的是跟踪哪些消息是已经读取过的,哪些是还没有读取过的。这是在读取消息时不丢失悄息的关键。
在从分区读取数据时,消费者会获取一批事件,检查这批事件里最大的偏移量,然后从这个偏移量开始读取另外一批事件。这样可以保证消费者总能以正确的顺序获取新数据, 不会错过任何事件。
如果一个悄费者退出,另一个消费者需要知道从什么地方开始继续处理,它需要知道前一个消费者在退出前处理的最后一个偏移量是多少。所谓的“另一个”消费者,也可能就是它自己重启之后重新回来工作。这也就是为什么消费者要“提交”它们的偏移量。它们把当前读取的偏移量保存起来,在退出之后,同一个群组里的其他消费者就可以接孚它们的工作。如果消费者提交了偏移量却未能处理完消息,那么就有可能造成消息丢失,这也是消费者丢失消息的主要原因。在这种情况下,如果其他消费者接手了工作,那些没有被处理完的消息就会被忽略,永远得不到处理。这就是为什么我们为什么非常重视偏移量提交的时间点和提交的方式。
#已提交的消息与已提交的偏移量
此处的己提交消息与之前讨论过的已提交消息是不一样的,它是指已经被写入所有同步副本并且对消费者可见的消息,而己提交偏移量是指消费者发送给Kafka 的偏移量,
用于确认它已经收到并处理好的消息位置。
消费者的配置
这里仅说明四个比较重要参数的配置
- group.id:如果两个消费者具有相同的group.id,井且订阅了同一个主题,那么每个消费者会分到主题分区的一个子集, 也就是说它们只能读到所有消息的一个子集(不过群组会读取主题所有的消息)。如果你希望消费者可以看到主题的所有消息,那么需要为它们设置唯一的group.id 。
- auto.offset.reset:这个参数指定了在没有偏移量可提交时(比如消费者第l 次启动时)或者请求的偏移量在broker 上不存在时),消费者会做些什么。这个参数有两种配置。一种是earliest ,如果选择了这种配置,消费者会从分区的开始位置读取数据,不管偏移量是否有效,这样会导致消费者读取大量的重复数据,但可以保证最少的数据丢失。一种是latest,如果选择了这种配置, 消费者会从分区的末尾开始读取数据,这样可以减少重复处理消息,但很有可能会错过一些消息。
- enable.auto.commit:这是一个非常重要的配置参数,你可以让消费者基于任务调度自动提交偏移量,也可以在代码里手动提交偏移量。自动提交的一个最大好处是,在实现消费者逻辑时可以少考虑一些问题。如果你在消费者轮询操作里处理所有的数据,那么自动提交可以保证只提交已经处理过的偏移量。自动提交的主要缺点是,无法控制重复处理消息(比如消费者在自动提交偏移量之前停止处理悄息),而且如果把消息交给另外一个后台线程去处理,自动提交机制可能会在消息还没有处理完毕就提交偏移量。
- auto.commit.interval.ms: 与第3个参数直接联系。如果选择了自动提交偏移量,可以通过该参数配置提交的频度,默认值是每5s提交一次。一般来说,频繁提交会增加额外的开销,但也会降低重复处理消息的频率。