我有一个脚本可以处理数百GB的数据,当我尝试处理500GB以上的数据时遇到了麻烦,在此之下一切正常。
首先调试应用程序,我收到关于超过spark.driver.maxResultSize值限制的错误,所以我将此值增加到4g,而现在失败的任务正在运行,但是,当我尝试时,我遇到了另一个问题要将结果保存到实木复合地板文件中,任务将失败并抛出此错误
17/01/27 06:35:27 INFO DAGScheduler: Job 7 failed: parquet at NativeMethodAccessorImpl.java:-2, took 12.106390 s
17/01/27 06:35:27 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 146:0 was 765207245 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize
所以,似乎我需要增加spark.akka.frameSize值
我的问题是,我已经在使用函数sparkConf()。set增加maxResultSize,但是我不知道如何(或在语法上)增加sparkConf()。set中的两个值。
这是我在这些部分中的代码的样子:
conf = (SparkConf().set("spark.driver.maxResultSize", "4g"))
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
而失败的任务是:
sqlContext.sql(sql).coalesce(5).write.parquet(sys.argv[3], mode='overwrite')
只是一件事,我无法修改spark集群中的conf文件,而且,我们使用luigi将任务提交给spark,因此在执行脚本时,我无法修改spark-submit字符串(这就是为什么我直接从脚本修改参数的原因)
任何指导,不胜感激。
最佳答案
RTFM-直接来自Spark 1.6.3 Python API documentation ...
pyspark.SparkConf类(...)
此类中的所有setter方法都支持链接。对于
例如,您可以编写conf.setMaster"local").setAppName("My app")