本文介绍了是否可以在 Kafka+SparkStreaming 中获取特定的消息偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量.查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量.

I'm trying to obtain and store the offset for a specific message in Kafka by using Spark Direct Stream.Looking at the Spark documentation is simple to obtain the range offsets for each partition but what I need is to store the start offset for each message of a topic after a full scan of the queue.

推荐答案

是的,你可以使用 MessageAndMetadata 版本的 createDirectStream 允许您访问消息元数据.

Yes, you can use MessageAndMetadata version of createDirectStream which allows you to access message metadata.

您可以在此处找到返回 tuple3 的 Dstream 的示例.

You can find example here which returns Dstream of tuple3.

val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
    (mmd.topic ,mmd.offset, mmd.message().toString)
  })

在上面的例子中,tuple3._1 会有 topictuple3._2 会有 offsettuple3._3 将有 message.

In above example tuple3._1 will have topic, tuple3._2 will have offset and tuple3._3 will have message.

希望这会有所帮助!

这篇关于是否可以在 Kafka+SparkStreaming 中获取特定的消息偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 17:30