问题描述
我正在开发一个应用程序,我在其中使用 kafka,技术是 scala.我的kafka消费者代码如下:
I am working on a application in which I am using kafka and tech is scala. My kafka consumer code is as follows:
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(topic))
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
它给了我所有的记录,但问题是我已经在 kafka 消费者中有数据,这可能导致重复数据意味着具有相同键的数据可能已经存在于主题中.有什么方法可以从特定时间检索数据.表示在轮询之前是否可以计算当前时间并仅检索该时间之后的记录.我有什么办法可以做到这一点?
It gives me all the records but the thing is I already have data in kafka consumer which may lead to duplicate data means data with same key can already be there in topic. Is there is any way by which I can retrieve data from a particular time. Means before polling if I can calculate current time and retrieve only those records which came after that time. Any way I can achieve this?
推荐答案
你可以使用 offsetsForTimes 方法 KafkaConsumer API.
You can use the offsetsForTimes method in the KafkaConsumer API.
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._
object OffsetsForTime extends App {
implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
offsetQuery
.map { case (tp, time) => tp -> new java.lang.Long(time) }
.asJava
val topic = "myInTopic"
val timestamp: Long = 1595971151000L
val props = new Properties()
props.put("group.id", "group-id1337")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(java.util.Collections.singletonList(topicPartition))
// dummy poll before calling seek
consumer.poll(Duration.ofMillis(500))
// get next available offset after given timestamp
val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
// seek to offset
consumer.seek(topicPartition, offsetAndTimestamp.offset())
// poll data
val record = consumer.poll(Duration.ofMillis(500)).asScala.toList
for (data <- record) {
println(s"Timestamp: ${data.timestamp()}, Key: ${data.key()}, Value: ${data.value()}")
}
}
测试
./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560 1_old
CreateTime:1595971147697 2_old
CreateTime:1595971150136 3_old
CreateTime:1595971192649 1_new
CreateTime:1595971194489 2_new
CreateTime:1595971196416 3_new
将时间戳选择为 3_old
和 1_new
之间的时间以仅使用new"消息.
Selecting the timestamp to a time between 3_old
and 1_new
to only consume the "new" messages.
Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new
这篇关于如何根据时间戳获取Kafka消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!