本文介绍了是否可以在Kafka + SparkStreaming中获得特定的消息偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过使用Spark Direct Stream获取并在Kafka中存储特定消息的偏移量.查看Spark文档很容易获得每个分区的范围偏移量,但是我需要的是在对队列进行全面扫描之后存储主题的每个消息的起始偏移量.

I'm trying to obtain and store the offset for a specific message in Kafka by using Spark Direct Stream.Looking at the Spark documentation is simple to obtain the range offsets for each partition but what I need is to store the start offset for each message of a topic after a full scan of the queue.

推荐答案

是的,您可以使用 createDirectStream的MessageAndMetadata 版本,您可以通过访问message metadata.

Yes, you can use MessageAndMetadata version of createDirectStream which allows you to access message metadata.

您可以在此处找到返回tuple3的Dstream的示例.

You can find example here which returns Dstream of tuple3.

val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
    (mmd.topic ,mmd.offset, mmd.message().toString)
  })

在上面的示例中,tuple3._1将具有topictuple3._2将具有offset,而tuple3._3将具有message.

In above example tuple3._1 will have topic, tuple3._2 will have offset and tuple3._3 will have message.

希望这会有所帮助!

这篇关于是否可以在Kafka + SparkStreaming中获得特定的消息偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-19 20:21