本文介绍了java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Scala Spark Streaming 应用程序,它从 3 个不同的 Kafka 生产者 接收来自同一主题的数据.

I have a Scala Spark Streaming application that receives data from the same topic from 3 different Kafka producers.

Spark 流应用在主机 0.0.0.179 的机器上,Kafka 服务器在主机 0.0.0.178 的机器上,Kafka 生产者code> 在机器上,0.0.0.1800.0.0.1810.0.0.182.

The Spark streaming application is on machine with host 0.0.0.179, the Kafka server is on machine with host 0.0.0.178, the Kafka producers are on machines, 0.0.0.180, 0.0.0.181, 0.0.0.182.

当我尝试运行 Spark Streaming 应用程序时出现以下错误

When I try to run the Spark Streaming application got below error

线程main" org.apache.spark.SparkException 中的异常:作业由于阶段失败而中止:阶段 19.0 中的任务 0 失败了 1 次,最近的失败:在阶段 19.0 中丢失任务 0.0(TID 19,本地主机):java.util.ConcurrentModificationException:KafkaConsumer 不安全用于多线程访问org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625)在org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)在org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)在org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIIterator.next(KafkaRDD.scala:228)在org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIIterator.next(KafkaRDD.scala:194)在 scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 在scala.collection.Iterator$$anon$11.next(Iterator.scala:409) atorg.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204)在org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)在org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203)在org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)在org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211)在org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)在 org.apache.spark.scheduler.Task.run(Task.scala:85) 在org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:27​​4)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:748)

现在我阅读了数千篇不同的帖子,但似乎没有人能够找到解决此问题的方法.

Now I read thousand of different posts but no one seems to be able to find a solution at this issue.

我该如何处理我的申请?我是否必须修改 Kakfa 上的某些参数(目前 num.partition 参数设置为 1)?

How can I handle this on my application? Do I have to modify some parameters on Kakfa (at the moment the num.partition parameter is set to 1)?

以下是我的应用程序代码:

Following is the code of my application :

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  }
  )
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()

谢谢

推荐答案

你的问题在这里:

s1.print()
s1.saveAsTextFiles("results/", "")

由于 Spark 创建了一个流图,而您在这里定义了两个流:

Since Spark creates a graph of flows, and you define two flows here:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file

Spark 将尝试同时运行这两个图,因为它们彼此独立.由于 Kafka 使用缓存消费者方法,因此它有效地尝试对两个流执行使用相同的消费者.

Spark will attempt to concurrently run both of these graphs, since they are independent of each other. Since Kafka uses a cached consumer approach, it is effectively trying to use the same consumer for both stream executions.

您可以做的是在运行两个查询之前缓存 DStream:

What you can do is cache the DStream before running the two queries:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")

这篇关于java.util.ConcurrentModificationException:KafkaConsumer 对于多线程访问不安全的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 23:05