我在spark中运行reduceByKey。我的程序是spark最简单的示例:
val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
.map(word => (word, 1))
.reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")
但是它总是耗尽内存...
我正在使用50台服务器,每台服务器35个执行程序,每台服务器140GB内存。
文件量为:
8TB文档,200亿个文档,总计1万亿个单词。
减少后的字数约为1亿。
我想知道如何设置spark的配置吗?
我想知道这些参数应该是什么值?
1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?
最佳答案
如果您发布日志将很有帮助,但一种选择是在读取初始文本文件(例如sc.textFile(path, 200000)
)时指定更多的分区,而不是在读取后重新分区。另一个重要的事情是确保输入文件是可拆分的(某些压缩选项使其无法拆分,在这种情况下,Spark可能必须在引起OOM的单台机器上读取它)。
其他一些选项是,由于您不缓存任何数据,因此会减少Spark为缓存预留的内存量(由spark.storage.memoryFraction
控制),也因为您仅使用我建议的字符串元组使用org.apache.spark.serializer.KryoSerializer
序列化器。