python-kafka之理论篇

 

kafka系列文章之python-api的使用。

在使用kafka-python时候需要注意,一定要版本兼容,否则在使用生产者会报 无法更新元数据的错误。

在本片测试中java版本为如下,kafka版本为0.10.0,kafka-python版本为1.3.1,目前最新的版本为1.4.4

[root@test2 bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)

从官网下载kafka-python,源码安装即可!https://pypi.org/project/kafka-python/1.3.1/

安装完成之后一个简易的测试:

 
#一个简单的生成者
>>>: from kafka import KafkaProducer >>>: producer
= KafkaProducer(bootstrap_servers=["10.0.102.204:9092"]) >>>: producer.send("science",b"Hello world") <kafka.producer.future.FutureRecordMetadata object at 0x0000000003B28080>

#我们向science主题发送了一个“Hello world”消息。可以在控制台使用消费者查看如下
[root@test3 bin]# ./kafka-console-consumer.sh --zookeeper=10.0.102.204:2181 --topic science --from-beginning

Hello world

 

上面只是一个简单的实例,主要用来验证当前的python api是否可以使用;下面会详细说明python-kafka的使用。

kafka生成者

一个应用程序在很多情况下需要往kafka写入消息:记录用户的活动(用于审计和分析),记录度量指标,保存日志消息,与其他应用程序进行异步通信,缓冲即将写入到数据库的数据,等等。

尽管生产者API使用起来很简单,但消息的发送过程还是比较复杂,如下图(摘自kafka权威指南)

首先从创建一个ProducerRecord对象开始,ProducerRecord对象需要包含目标主题和要发送的内容。我们还可以指定键和分区。在发送ProducerRecord对象时,生产者首先要把键和值对象进行序列化,这样他们才能在网络上传输。python3.x中需要序列化为bytes类型,才能传输。

然后,数据被传给分区器。如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord对象来选择一个分区。选好分区之后,生成者就指知道该往哪个主题和分区发送这条记录。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。

服务器在收到消息之后会返回一个响应。如果消息成功写入kafka,就返回一个RecodMetaDate对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。

要往kafka写入消息,首先要创建一个生产者对象,并设置一些属性。下面介绍一些kafka的属性。

 
bootstrap.servers:该属性指定broker的地址清单,地址格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker中查找到其他的broker的信息。不过建议
    至少要提供两个broker信息,一旦其中一个宕机,生产者仍然能连接到集群上。 key_serializer (callable) – used to convert user
-supplied keys to bytes If not None, called as f(key), should return bytes. Default: None. value_serializer (callable) – used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes. Default: None. 上面的两个数值指定了键和值怎么序列化。默认是none。
在实例化了一个生成者对象之后,实例有一个config属性,返回的是当前生产者的默认配置如下: producer.config {
'bootstrap_servers': ['10.0.102.204:9092'], 'client_id': 'kafka-python-producer-1', 'key_serializer': None, 'value_serializer': None, 'acks': 1, 'compression_type': None, 'retries': 0, 'batch_size': 16384, 'linger_ms': 0, 'partitioner': <kafka.partitioner.default.DefaultPartitioner object at 0x0000000003A037B8>, 'buffer_memory': 33554432, 'connections_max_idle_ms': 600000, 'max_block_ms': 60000, 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_crlfile': None, 'api_version': (0, 10), 'api_version_auto_timeout_ms': 2000, 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': <class 'selectors.SelectSelector'>, 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None} data = producer.config type(data) <class 'dict'> data是一个字典对象,然后就可以使用修改字典的方法修改对应的属性。 从上面可以看到生成者有很多可以配置的参数,他们大部分的参数都有合理的默认值,所有没有必要修改它们。不过有几个参数在内存中使用,性能和可靠性方面对生产者影响比较大,下面会介绍这些参数。 acks,这个参数指定了必须有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。 如果acks=0;生成者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有接收到消息,那么生产者是不知道的,消息也就是丢失了。
不过,因为生成者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。 如果acks
=1:只要集群的首领节点收到消息,生成者会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到
一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步
发送。如果让客户端等待服务器的响应(通过调用future对象的get()方法),显然会增加延迟。如果客户端使用回调,延迟问题可以得到缓解,不过吞吐量还是会受发送中
消息数量的限制。 如果acks
=all,只有当所有参与复制的节点全部收到消息时,生成者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器
发生崩溃,整个集群仍然可以运作。不过,它的延迟比acks=1时更高,因为我们要等待不只一个服务器节点接收消息。 buffer.memory: 该参数用来设置生成者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候send()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full参数(
0.9.0.0之后的版本中被替换为max.block.ms,表示在抛出异常之
前可以阻塞一段时间) compression.type: 默认情况下,消息发送不会被压缩。该参数可以设置为snappy, gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。snappy压缩,
占用较少的CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的CPU,但会提供更高的压缩比,
所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销。 retries:生成者从服务器收到的错误有可能是临时性的错误。在这种情况下,retries参数的值决定了生成者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来个改变这个时间间隔。 batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,安装字节数计算。当批次被填满时,批次里所
有的消息都会被发送出去。不过生产者并不一定都会等到批次填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次带下设置得很大,也不会
造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。 linger.ms :该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。kafka生产者会在批次填满或者linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然会增加延迟,但是会提升吞吐量(因为一次发送更多的消息,每个消息的开销变小了) client.
id: 该参数可以是任意字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。 max.in.flight.requests.per.connection: 该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置
为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。 timeout.ms: 指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配
----如果在指定时间内没有收到同步副本的确认,那么broker就会返回一个错误。 request.timeout.ms: 指定了生产者在发送数据时等待服务器返回响应的时间。 metadata.fetch.timeout.ms:指定了生产者在获取元数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误。 max.block.ms: 该参数指定了在调用send()方法或使用partitionFor()方法获取元数据时生产者阻塞时间,当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞,
在阻塞时间达到max.block.ms,生产者会抛出异常。 max.request.size: 该参数用于控制生成者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的
单个最大消息为1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息大小为1KB。另外,broker对可接收的消息最大值也有自己的限制,
所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。 receive.buffer.bytes和send.buffer.bytes:这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设置为
-1.就是用操作系统的默认值。如果生产者或
消费者与broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
 

kafka消费者

 应用程序从kafkaconsumer向kafka订阅主题,并从订阅的主题上接收消息。从kafka读取数据不同于从其他消息系统读取数据,它涉及一些概念,需要先理解一下!

消费者和消费者群组

设想一种情况:应用程序从kafka订阅主题,读取消息,但是我们知道生产者在向主题写入消息时,可以是多个生产者并发写入的,这时候生产者向主题写入消息的速度超过了应用程序验证数据的速度,这个时候该怎么处理?如果只使用单个消费者处理消息,应用程序会远远跟不上消息的生成速度。显然,此时很有必要对消费者进行横向伸缩,就像多个生产者向相同主题写入消息一样,我们也可以使用多个消费者从同一个逐日读取消息,对消息进行分流。

kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。如果往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。

往群组里添加消费者是横向伸缩消费能力的主要方式。kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让他们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。

kafka设计的主要目标之一,就是要让kafka主题里的数据能够满足企业各种应用场景的需求。在这些场景中,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让他们获取到主题的所有的消息。不同于传统的消息系统,横向伸缩kafka消费者和消费者群组并不会对性能造成负面影响。【每个消费者群组得道的是所有的消息,而不是部分的消息】

消费者群组与分区再均衡

主要说一些概念性的东西

一个新的悄费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩愤时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时, 比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除梢费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。

消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。如果一个消费者发生崩愤,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

在0. 10.1 版本里, Kafka 社区引入了一个独立的心跳线程,可以在轮均消息的空档发送心跳。这样一来,发送心跳的频率(也就是消费者群纽用于检测发生崩溃的消费者或不再发送心跳的消费者的时间)与消息轮询的频率(由处理消息所花费的时间未确定)之间就是相互独立的。在新版本的Kafka 里,可以指定消费者在离开群纽并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livel ock ) ,比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配直与
session.timeout.ms 是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。

当消费者要加入群组时,它会向群组协调器发送一个JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个悄费者分配分区。它使用一个实现了partitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。

上面介绍了消费者和消费者群组的一些理论性东西,下面来简单创建一个消费者!

 
from kafka import KafkaConsumer

consumer = KafkaConsumer("science", bootstrap_servers=["10.0.102.204:9092"], auto_offset_reset='earliest')
for i in consumer:
    print(i)

#这里KafkaConsumer接收了三个参数,主题,borker的服务器地址和端口,以及消费者从哪里开始读取消息
#上面的代码若是consumer有数据则会打印出数据,若是主题中没有消息,则会一致阻塞等待
 

上面print打印出的一个数值如下:

ConsumerRecord(topic='science', partition=0, offset=0, timestamp=1545358023897, timestamp_type=0, key=None, value=b'test message', checksum=1777691423, serialized_key_size=-1, serialized_value_size=12)

#返回的是一个consumer对象,包含了一些元数据信息。

消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调,分区再均衡,发送心跳和获取数据,开发者只需要使用一组简单的API来处理从分区返回的数据。上面的代码是一个简单的利用for循环的轮询。

消费者的参数配置

 
1. fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到
有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有
很多可用数据,但消费者的CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。
2. fetch.max.wait.ms:我们通过fetch.min.bytes告诉Kafka ,等到有足够的数据时才把它返回给消费者。而fetch.max.wait.ms 则用于指定broker 的等待时间,默认是500ms
,如果没有足够的数据流入Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致500ms 的延迟。如果要降低潜在的延迟(为了满足SLA ),可以把该参数值设置得小一些。
如果fetch.max.
wait.ms被设为lOOms ,并且fetch.min.bytes被设为1MB ,那么Kafka 在收到消费者的请求后,要么返回1MB 数据,要么在1OOms 后返回所有可用的数据,
就看哪个条件先得到满足。
3. max.partition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说kafkaconsumer.poll() 方法从每个分区里返回的记录最
多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少4MB 的可用内存来接收记录。在为消费者分配内存时,
可以给它们多分配一些,因为如果群组里有消费者发生崩愤,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须比broker 能够接收的最大消息的字节
数(通过max.message.size属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。
消费者需要频繁调用poll()方法来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无怯及时进行下一个轮询来避免会
话过期。如果出现这种情况, 可以把max.partition.fetch.bytes值改小,或者延长会i舌过期时间。
4. session.timeout.ms:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s 。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调
器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()
方法向协调器发送心跳的频率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比
session.timeout.ms 小, 一般是session.timeout.ms 的三分之一。如果session.timeout.ms 是3s ,那么heartbeat.interval.ms应该是1s 。
把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设 置得大一些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。
5.auto.offset.reset: 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认
值是latest , 意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earlist,意思是说,在偏移量无效的情况下,
消费者将从起始位置读取分区的记录。
6.enable.auto.commit: 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把
它设为true ,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
7.partition.assignment.strategy: 我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。
Kafka 有两个默认的分配策略。

Range: 该策略会把主题的若干个连续的分区分配给消费者。假设悄费者c1 和消费者c2 同时订阅了主题t1 和主题t2 ,井且每个主题有3 个分区。那么消费者c1有可能分配到这两个主题的分区0

和分区1 ,而消费者C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range

策略,而且分区数量无怯被消费者数量整除,就会出现这种情况。 RoundRobin: 该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin 策略来给消费者c1和消费者c2 分配分区,那么消费者c1 将分到主题T1的分区0 和分区2 以及主题t2的分区1 ,消费

者C2 将分配到主题t1 的分区1 以及主题t2的分区0 和分区2 。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见) , RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

可以通过设置partition.assignment.strategy来选择分区策略。默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,不过也可以把

它改为org.apache.kafka.clients.consumer.RoundRobinAssignor。还可以自定义策略,在这种情况下,partition.assignment,strategy属性的值就是自定义类的名字。 8.client.id:该属性可以是任意字符串, broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

9.max.poll.records:该属性用于控制单次调用call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。 10. receive.buffer.bytes和send.buffer.bytes: socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者

与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

 
01-04 04:06