我有一个脚本可以处理数百GB的数据,当我尝试处理500GB以上的数据时遇到了麻烦,在此之下一切正常。
首先调试应用程序,我收到关于超过spark.driver.maxResultSize值限制的错误,所以我将此值增加到4g,而现在失败的任务正在运行,但是,当我尝试时,我遇到了另一个问题要将结果保存到实木复合地板文件中,任务将失败并抛出此错误

17/01/27 06:35:27 INFO DAGScheduler: Job 7 failed: parquet at NativeMethodAccessorImpl.java:-2, took 12.106390 s
17/01/27 06:35:27 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 146:0 was 765207245 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize


所以,似乎我需要增加spark.akka.frameSize值

我的问题是,我已经在使用函数sparkConf()。set增加maxResultSize,但是我不知道如何(或在语法上)增加sparkConf()。set中的两个值。

这是我在这些部分中的代码的样子:

conf = (SparkConf().set("spark.driver.maxResultSize", "4g"))
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)


而失败的任务是:

sqlContext.sql(sql).coalesce(5).write.parquet(sys.argv[3], mode='overwrite')


只是一件事,我无法修改spark集群中的conf文件,而且,我们使用luigi将任务提交给spark,因此在执行脚本时,我无法修改spark-submit字符串(这就是为什么我直接从脚本修改参数的原因)

任何指导,不胜感激。

最佳答案

RTFM-直接来自Spark 1.6.3 Python API documentation ...


  pyspark.SparkConf类(...)
  此类中的所有setter方法都支持链接。对于
  例如,您可以编写conf.setMaster"local").setAppName("My app")

07-26 02:50