问题描述
我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量.查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量.
I'm trying to obtain and store the offset for a specific message in Kafka by using Spark Direct Stream.Looking at the Spark documentation is simple to obtain the range offsets for each partition but what I need is to store the start offset for each message of a topic after a full scan of the queue.
推荐答案
是的,你可以使用 MessageAndMetadata 版本的 createDirectStream
允许您访问消息元数据
.
Yes, you can use MessageAndMetadata version of createDirectStream
which allows you to access message metadata
.
您可以在此处找到返回 tuple3
的 Dstream 的示例.
You can find example here which returns Dstream of tuple3
.
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)
val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})
在上面的例子中,tuple3._1
会有 topic
,tuple3._2
会有 offset
和 tuple3._3
将有 message
.
In above example tuple3._1
will have topic
, tuple3._2
will have offset
and tuple3._3
will have message
.
希望这会有所帮助!
这篇关于是否可以在 Kafka+SparkStreaming 中获取特定的消息偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!