本文介绍了Kafka使用者(0.8.2.2)可以批量读取消息吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

据我了解,Kafka使用者顺序读取指定分区中的消息...

As per my understanding Kafka consumer reads messages from an assigned partition sequentially...

我们计划有多个Kafka使用者(Java),该使用者具有与我相同的组.因此,如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量.例如,Producer每秒发布40条消息,例如...使用者进程每秒味精1.虽然我们可以有多个使用者,但不能有40 rt?如果我错了请纠正我...

We are planning to have multiple Kafka consumer (Java) which has same group I'd ..so if it reads sequentially from an assigned partition then how we can achieve high throughput ..i.e. For Example Producer publishes messages like 40 per sec ...Consumer process msg 1 per sec ..though we can have multiple consumers but cannot have 40 rt??? Correct me if I'm wrong...

在我们的情况下,使用者必须在成功处理消息后才提交偏移量..else消息将被重新处理...还有更好的解决方案吗?

And in our case consumer have to commit offset only after message is processed successfully ..else message will be reprocessed... Is there any better solution???

推荐答案

根据您的问题澄清.

Kafka消费者可以一次阅读多条消息.但是,Kafka Consumer并不能真正读取消息,更正确地说,消费者可以读取一定数量的字节,然后根据单个消息的大小来确定要读取多少条消息.仔细阅读 Kafka使用者配置,您将无法指定要提取的邮件数量,您可以指定使用者可以提取的最大/最小数据大小.但是,在该范围内的许多消息就是您将获得的消息数.正如您所指出的那样,您将始终按顺序获得消息.

A Kafka Consumer can read multiple messages at a time. But a Kafka Consumer doesn't really read messages, its more correct to say a Consumer reads a certain number of bytes and then based on the size of the individual messages, that determines how many messages will be read. Reading through the Kafka Consumer Configs, you're not allowed to specify how many messages to fetch, you specify a max/min data size that a consumer can fetch. However many messages fit inside that range is how many you will get. You will always get messages sequentially as you have pointed out.

相关的使用者配置(0.9.0.0及更高版本)

  • fetch.min.bytes
  • max.partition.fetch.bytes

更新

在评论中使用您的示例,我的理解是,如果我在config中指定读取10个字节,并且如果每个消息为2个字节,则消费者一次读取5条消息."那是真实的.您的下一条语句这意味着这5条消息的偏移量在分区中是随机的",这是错误的.顺序阅读并不意味着一一阅读,而是意味着它们保持有序.您可以批处理项目并使它们保持顺序/有序.举下面的例子.

Using your example in the comments, "my understanding is if i specify in config to read 10 bytes and if each message is 2 bytes the consumer reads 5 messages at a time." That is true. Your next statement, "that means the offsets of these 5 messages were random with in partition" that is false. Reading sequential doesn't mean one by one, it just means that they remain ordered. You are able to batch items and have them remain sequential/ordered. Take the following examples.

在Kafka日志中,如果有10条消息(每2个字节),其偏移量为[0,1,2,3,4,5,6,7,8,9].

In a Kafka log, if there are 10 messages (each 2 bytes) with the following offsets, [0,1,2,3,4,5,6,7,8,9].

如果读取10个字节,您将获得一批包含偏移量[0,1,2,3,4]的消息.

If you read 10 bytes, you'll get a batch containing the messages at offsets [0,1,2,3,4].

如果您读取6个字节,则将获得一批包含消息,它们的偏移量为[0,1,2].

If you read 6 bytes, you'll get a batch containing the messages at offsets [0,1,2].

如果您读取6个字节,然后再读取6个字节,则将获得两批包含消息[0,1,2]和[3,4,5]的消息.

If you read 6 bytes, then another 6 bytes, you'll get two batches containing the messages [0,1,2] and [3,4,5].

如果您读取8个字节,然后读取4个字节,则会得到两批包含消息[0,1,2,3]和[4,5]的消息.

If you read 8 bytes, then 4 bytes, you'll get two batches containing the messages [0,1,2,3] and [4,5].

更新:澄清承诺

我不是100%地确定提交的工作方式,我主要是在Storm环境中与Kafka一起工作的.提供的KafkaSpout自动提交Kafka消息.

I'm not 100% sure how committing works, I've mainly worked with Kafka from a Storm environment. The provided KafkaSpout automatically commits Kafka messages.

但要查看 0.9.0.1消费者API ,我建议您这样做.似乎与该讨论特别相关的是三种方法.

But looking through the 0.9.0.1 Consumer APIs, which I would recommend you do to. There seems to be three methods in particular that are relevant to this discussion.

  • 投票(长时间超时)
  • commitSync()
  • commitSync(java.util.Map偏移量)

poll方法检索消息,可能只有1条,可能是20条,对于您的示例来说,假设返回了3条消息[0,1,2].您现在拥有了这三个消息.现在由您决定如何处理它们.您可以处理它们0 => 1 => 2,1 => 0 => 2,2 => 0 => 1,这取决于.无论您如何处理它们,在处理之后,您都将要提交,它告诉Kafka服务器您已经完成了这些消息.

The poll method retrieves messages, could be only 1, could be 20, for your example lets say 3 messages were returned [0,1,2]. You now have those three messages. Now it's up you to determine how to process them. You could process them 0 => 1 => 2, 1 => 0 => 2, 2 => 0 => 1, it just depends. However you process them, after processing you'll want to commit which tells the Kafka server you're done with those messages.

使用commitSync()会提交上次轮询返回的所有内容,在这种情况下,它将提交偏移量[0,1,2].

Using the commitSync() commits everything returned on last poll, in this case it would commit offsets [0,1,2].

另一方面,如果您选择使用commitSync(java.util.Map offsets),则可以手动指定要提交的偏移量.如果要按顺序处理它们,则可以处理偏移量0,然后提交它,处理偏移量1,然后提交它,最后处理偏移量2,然后提交.

On the other hand, if you choose to use commitSync(java.util.Map offsets), you can manually specify which offsets to commit. If you're processing them in order, you can process offset 0 then commit it, process offset 1 then commit it, finally process offset 2 and commit.

总而言之,Kafka使您可以自由处理消息的愿望,您可以选择按顺序处理消息,也可以选择完全随机地处理消息.

All in all, Kafka gives you the freedom to process messages how to desire, you can choose to process them sequentially or entirely random at your choosing.

这篇关于Kafka使用者(0.8.2.2)可以批量读取消息吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-07 05:36