问题描述
看看我对已接受的解决方案答案的最后评论
我像这样配置了一个 DStream
:
I configured a DStream
like so:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka1.example.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "mygroup",
"specific.avro.reader" -> true,
"schema.registry.url" -> "http://schema.example.com:8081"
)
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, DataFile](topics, kafkaParams)
)
虽然这有效并且我按预期获得了 DataFile
s,但是当我停止并重新运行作业时,它总是从主题的开头开始.我怎样才能实现它从上次停止的地方继续下去?
While this works and I get the DataFile
s as expected, when I stop and re-run the job, it always starts at the beginning of the topic. How can I achieve that it continues where it last went off?
在 Bhima Rao Gogineni 的回答中,我像这样改变了我的配置:
As in the answer by Bhima Rao Gogineni, I changed my configuration like this:
val consumerParams =
Map("bootstrap.servers" -> bootstrapServerString,
"schema.registry.url" -> schemaRegistryUri.toString,
"specific.avro.reader" -> "true",
"group.id" -> "measuring-data-files",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"enable.auto.commit" -> (false: JavaBool),
"auto.offset.reset" -> "earliest")
然后我设置了流:
val stream = KafkaUtils.
createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, DataFile](List(inTopic), consumerParams))
然后我处理它:
stream.
foreachRDD { rdd =>
... // Do stuff with the RDD - transform, produce to other topic etc.
// Commit the offsets
log.info("Committing the offsets")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
但是重新运行的时候总是从头开始.
But it still always starts from the beginning when re-running.
这是我的 Kafka 日志的摘录:
Here is an excerpt from my Kafka log:
跑步:
[2018-07-04 07:47:31,593] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 22 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,594] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,599] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 23 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:06,690] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 131488999 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:06,690] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 131488999 in 1 ms. (kafka.log.Log)
[2018-07-04 07:48:10,788] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Member consumer-1-262ece09-93c4-483e-b488-87057578dabc in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 24 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:45,761] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 153680971 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:45,763] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 153680971 in 3 ms. (kafka.log.Log)
[2018-07-04 07:49:24,819] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 175872864 (kafka.log.ProducerStateManager)
[2018-07-04 07:49:24,820] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 175872864 in 1 ms. (kafka.log.Log)
下一次运行:
[2018-07-04 07:50:13,748] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 24 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,749] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,754] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 25 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Member consumer-1-906c2eaa-f012-4283-96fc-c34582de33fb in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 26 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
跟进 2
我像这样保存偏移量更详细:
Follow up 2
I made saving the offsets more verbose like this:
// Commit the offsets
log.info("Committing the offsets")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if(offsetRanges.isEmpty) {
log.info("Offset ranges is empty...")
} else {
log.info("# offset ranges: %d" format offsetRanges.length)
}
object cb extends OffsetCommitCallback {
def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit =
if(exception != null) {
log.info("Commit FAILED")
log.error(exception.getMessage, exception)
} else {
log.info("Commit SUCCEEDED - count: %d" format offsets.size())
offsets.
asScala.
foreach {
case (p, omd) =>
log.info("partition = %d; topic = %s; offset = %d; metadata = %s".
format(p.partition(), p.topic(), omd.offset(), omd.metadata()))
}
}
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, cb)
我得到这个例外:
2018-07-04 10:14:00 ERROR DumpTask$:136 - Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:163)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:182)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
我该如何解决这个问题?
How should I solve this?
推荐答案
有了新的 Spark Kafka Connect API,我们可以尝试异步提交.
With new Spark Kafka Connect API, We can try async commits.
读取偏移量并在进程完成后提交.
Read the offsets and commit once done with the process.
Kafka 配置相同:
Kafka config for the same:
enable.auto.commit=false
auto.offset.reset=earliest
或 auto.offset.reset=latest
--> 如果 Kafka 主题中没有可用的最后提交的偏移量,则此配置生效然后它将根据此配置从开始或结束读取偏移量.
auto.offset.reset=earliest
or auto.offset.reset=latest
--> this config takes effect if there is no last committed offset available in Kafka topic then it will read the offsets from start or end based this config.
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
这是来源:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
这篇关于来自 Kafka 的 Spark DStream 总是从头开始的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!