而在大型数据集上运行的火花

而在大型数据集上运行的火花

本文介绍了" sparkContext被关停"而在大型数据集上运行的火花的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在一个集群上运行sparkJob过去某些数据的大小(〜2,5gb)我正在和执行人丢失或者取消,因为SparkContext被关闭作业。当纱线GUI找我看到被杀害的工作是成功的。上是500MB的数据中运行时不存在任何问题。我一直在寻找一个解决方案,并发现:
   - 似乎纱杀死一些执行者,他们要求更多的内存比预期的

任何建议如何调试它?

命令,我提出我的火花与工作:

  /opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver内存22克--driver-核心4 --num遗嘱执行人15 --executor内存6克--executor-6芯--class sparkTesting.Runner --master纱的客户端myJar.jar jarArguments

和sparkContext设置

  VAL sparkConf =(新SparkConf()
    .SET(spark.driver.maxResultSize,21克)
    .SET(spark.akka.frameSize,2011)
    .SET(spark.eventLog.enabled,真)
    .SET(spark.eventLog.enabled,真)
    .SET(spark.eventLog.dir,configVar.sparkLogDir)
    )

简体code失败看起来像

  VAL HC =新org.apache.spark.sql.hive.HiveContext(SC)
VAL broadcastParser = sc.broadcast(新解析器())VAL featuresRdd = hc.sql(选择+ configVar.columnName +由+ configVar.Table +ORDER BY RAND()LIMIT+ configVar.Articles)
VAL myRdd:org.apache.spark.rdd.RDD [字符串] = featuresRdd.map(DoSomething的(_,broadcastParser))VAL allWords = featuresRdd
  .flatMap(线= GT; line.split())
  。计数VAL wordQuantiles = featuresRdd
  .flatMap(线= GT; line.split())
  .MAP(字=>(字,1))
  .reduceByKey(_ + _)
  .MAP(双=>(pair._2,pair._2))
  .reduceByKey(_ + _)
  .sortBy(_._ 1)
  。搜集
  .scanLeft((0,0.0))((资源,补充)=>(add._1,res._2 + add._2))
  .MAP(进入= GT;(entry._1,entry._2 / allWords))VAL词典= featuresRdd
  .flatMap(线= GT; line.split())
  .MAP(字=>(字,1))
  .reduceByKey(_ + _)//这里我有字的RDD,数元组
  .filter(_._ 2 - ; =人数超过)
  .filter(_._ 2'=每种不超过)
  .filter(_._ 1.trim!=())
  .MAP(_._ 1)
  .zipWithIndex
  。搜集
  .toMap

和错误堆栈

 异常线程mainorg.apache.spark.SparkException:职位取消,因为SparkContext被关闭
在org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
在org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
在scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
在org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
在org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
在org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
在org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
在org.apache.spark.SparkContext $$ anonfun $停止$ 7.apply $ MCV $ SP(SparkContext.scala:1715)
在org.apache.spark.util.Utils $ .tryLogNonFatalError(Utils.scala:1185)
在org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
在org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
在org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1839年)
在org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
在org.apache.spark.rdd.RDD.count(RDD.scala:1121)
在sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
在sparkTesting.Runner $。主要(Runner.scala:133)
在sparkTesting.Runner.main(Runner.scala)
在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang.reflect.Method.invoke(Method.java:483)
在org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:180)
在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:205)
在org.apache.spark.deploy.SparkSubmit $。主要(SparkSubmit.scala:120)
在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


解决方案

找到了答案。

我的表被保存为一个20GB的Avro文件。当执行者试图打开它。他们每个人都不得不20GB加载到内存中。通过使用CSV,而不是Avro公司解决了这个问题。

When running sparkJob on a cluster past a certain data size(~2,5gb) I am getting either "Job cancelled because SparkContext was shut down" or "executor lost". When looking at yarn gui I see that job that got killed was successful. There are no problems when running on data that is 500mb. I was looking for a solution and found that: - "seems yarn kills some of the executors as they request more memory than expected."

Any suggestions how to debug it?

command that I submit my spark job with:

/opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit  --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6  --class sparkTesting.Runner   --master yarn-client myJar.jar jarArguments

and sparkContext settings

val sparkConf = (new SparkConf()
    .set("spark.driver.maxResultSize", "21g")
    .set("spark.akka.frameSize", "2011")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.enabled", "true")
    .set("spark.eventLog.dir", configVar.sparkLogDir)
    )

Simplified code that fails looks like that

 val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val broadcastParser = sc.broadcast(new Parser())

val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles)
val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser))

val allWords= featuresRdd
  .flatMap(line => line.split(" "))
  .count

val wordQuantiles= featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .map(pair => (pair._2 , pair._2))
  .reduceByKey(_+_)
  .sortBy(_._1)
  .collect
  .scanLeft((0,0.0)) ( (res,add) => (add._1, res._2+add._2) )
  .map(entry => (entry._1,entry._2/allWords))

val dictionary = featuresRdd
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // here I have Rdd of word,count tuples
  .filter(_._2 >= moreThan)
  .filter(_._2 <= lessThan)
  .filter(_._1.trim!=(""))
  .map(_._1)
  .zipWithIndex
  .collect
  .toMap

And Error stack

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1714)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50)
at sparkTesting.Runner$.main(Runner.scala:133)
at sparkTesting.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
解决方案

Found the answer.

The my table was saved as a 20gb avro file. When executors tried to open it. Each of them had to load 20gb into memory. Solved it by using csv instead of avro

这篇关于&QUOT; sparkContext被关停&QUOT;而在大型数据集上运行的火花的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-21 02:22