本文介绍了Kafka的Spark DStream总是从头开始的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

看看我对解决方案可接受答案的最后评论

我像这样配置 DStream :

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "kafka1.example.com:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[KafkaAvroDeserializer],
    "group.id" -> "mygroup",
    "specific.avro.reader" -> true,
    "schema.registry.url" -> "http://schema.example.com:8081"
  )

  val stream = KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, DataFile](topics, kafkaParams)
  )

虽然可以正常工作,并且可以按预期方式获得 DataFile ,但是当我停止并重新运行该作业时,它总是从主题的开头开始.我如何才能实现它从上次故障开始的地方继续?

While this works and I get the DataFiles as expected, when I stop and re-run the job, it always starts at the beginning of the topic. How can I achieve that it continues where it last went off?

就像Bhima Rao Gogineni的回答一样,我这样更改了配置:

As in the answer by Bhima Rao Gogineni, I changed my configuration like this:

val consumerParams =
  Map("bootstrap.servers" -> bootstrapServerString,
      "schema.registry.url" -> schemaRegistryUri.toString,
      "specific.avro.reader" -> "true",
      "group.id" -> "measuring-data-files",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "enable.auto.commit" -> (false: JavaBool),
      "auto.offset.reset" -> "earliest")

然后我设置了流:

val stream = KafkaUtils.
  createDirectStream(ssc,
                     LocationStrategies.PreferConsistent,
                     ConsumerStrategies.Subscribe[String, DataFile](List(inTopic), consumerParams))

然后我处理它:

stream.
  foreachRDD { rdd =>
    ... // Do stuff with the RDD - transform, produce to other topic etc.
    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }

但是重新运行时,它始终始终从头开始.

But it still always starts from the beginning when re-running.

这是我的卡夫卡日志的摘录:

Here is an excerpt from my Kafka log:

运行:

[2018-07-04 07:47:31,593] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 22 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,594] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,599] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 23 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:06,690] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 131488999 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:06,690] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 131488999 in 1 ms. (kafka.log.Log)
[2018-07-04 07:48:10,788] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Member consumer-1-262ece09-93c4-483e-b488-87057578dabc in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 24 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:45,761] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 153680971 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:45,763] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 153680971 in 3 ms. (kafka.log.Log)
[2018-07-04 07:49:24,819] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 175872864 (kafka.log.ProducerStateManager)
[2018-07-04 07:49:24,820] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 175872864 in 1 ms. (kafka.log.Log)

下次运行:

[2018-07-04 07:50:13,748] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 24 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,749] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,754] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 25 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Member consumer-1-906c2eaa-f012-4283-96fc-c34582de33fb in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 26 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)

跟进2

我这样更详细地保存了偏移量:

Follow up 2

I made saving the offsets more verbose like this:

    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    if(offsetRanges.isEmpty) {
      log.info("Offset ranges is empty...")
    } else {
      log.info("# offset ranges: %d" format offsetRanges.length)
    }
    object cb extends OffsetCommitCallback {

      def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata],
                     exception: Exception): Unit =
        if(exception != null) {
          log.info("Commit FAILED")
          log.error(exception.getMessage, exception)
        } else {
          log.info("Commit SUCCEEDED - count: %d" format offsets.size())
          offsets.
            asScala.
            foreach {
              case (p, omd) =>
                log.info("partition = %d; topic = %s; offset = %d; metadata = %s".
                  format(p.partition(), p.topic(), omd.offset(), omd.metadata()))
            }
        }
    }
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, cb)

我得到这个异常:

2018-07-04 10:14:00 ERROR DumpTask$:136 - Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:163)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:182)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我该如何解决?

推荐答案

使用新的Spark Kafka Connect API,我们可以尝试异步提交.

With new Spark Kafka Connect API, We can try async commits.

读取偏移量,并在完成该过程后提交.

Read the offsets and commit once done with the process.

Kafka配置相同:

Kafka config for the same:

enable.auto.commit = false

auto.offset.reset =最早 auto.offset.reset = latest ->如果Kafka主题中没有最后提交的偏移量,则此配置生效然后它将根据此配置从开始或结束读取偏移量.

auto.offset.reset=earliest or auto.offset.reset=latest --> this config takes effect if there is no last committed offset available in Kafka topic then it will read the offsets from start or end based this config.

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

   // some time later, after outputs have completed
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

以下是来源: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

这篇关于Kafka的Spark DStream总是从头开始的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49