如何根据时间戳获取Kafka消息

如何根据时间戳获取Kafka消息

本文介绍了如何根据时间戳获取Kafka消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个应用程序,我在其中使用 kafka,技术是 scala.我的kafka消费者代码如下:

I am working on a application in which I am using kafka and tech is scala. My kafka consumer code is as follows:

val props = new Properties()
        props.put("group.id", "test")
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("auto.offset.reset", "earliest")
        props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据可能已经存在于主题中.有什么方法可以从特定时间检索数据.表示在轮询之前是否可以计算当前时间并仅检索该时间之后的记录.我有什么办法可以做到这一点?

It gives me all the records but the thing is I already have data in kafka consumer which may lead to duplicate data means data with same key can already be there in topic. Is there is any way by which I can retrieve data from a particular time. Means before polling if I can calculate current time and retrieve only those records which came after that time. Any way I can achieve this?

推荐答案

你可以使用 offsetsForTimes 方法 KafkaConsumer API.

You can use the offsetsForTimes method in the KafkaConsumer API.

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

object OffsetsForTime extends App {

  implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
    offsetQuery
      .map { case (tp, time) => tp -> new java.lang.Long(time) }
      .asJava

  val topic = "myInTopic"
  val timestamp: Long = 1595971151000L

  val props = new Properties()
  props.put("group.id", "group-id1337")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(java.util.Collections.singletonList(topicPartition))
  // dummy poll before calling seek
  consumer.poll(Duration.ofMillis(500))

  // get next available offset after given timestamp
  val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
  // seek to offset
  consumer.seek(topicPartition, offsetAndTimestamp.offset())

  // poll data
  val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

  for (data <- record) {
    println(s"Timestamp: ${data.timestamp()}, Key: ${data.key()}, Value: ${data.value()}")
  }

}

测试

./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560    1_old
CreateTime:1595971147697    2_old
CreateTime:1595971150136    3_old
CreateTime:1595971192649    1_new
CreateTime:1595971194489    2_new
CreateTime:1595971196416    3_new

将时间戳选择为 3_old1_new 之间的时间以仅使用new"消息.

Selecting the timestamp to a time between 3_old and 1_new to only consume the "new" messages.

Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new

这篇关于如何根据时间戳获取Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 04:18