我有一个Apache Spark集群,其中有一个主节点和三个工作节点。辅助节点分别具有32个核心和124G的内存。我还拥有HDFS数据集,其中包含约6.5亿条文本记录。该数据集是一些读入的序列化RDD,如下所示:

import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector}
val vectors = sc.objectFile[(String, SparseVector)]("hdfs://mn:8020/data/*")

我想从这些记录中提取一百万个样本来进行一些分析,所以我想尝试使用val sample = vectors.takeSample(false, 10000, 0)。但是,最终由于以下错误消息而失败:
 15/08/25 09:48:27 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:64)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
        at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:61)
        at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:89)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$r

我知道我的堆空间用完了(我认为是在驱动程序上),这是有道理的。执行hadoop fs -du -s /path/to/data,数据集在磁盘上占用2575 GB(但大小仅为850 GB)。

因此,我的问题是,该如何提取1000000条记录的样本(我稍后计划将其序列化到磁盘)?我知道我可以使用较小的样本大小执行takeSample()并在以后进行汇总,但我认为我只是没有设置正确的配置或做错了什么,这使我无法按照自己的意愿进行操作。

最佳答案

您可以通过增加分区数量,使每个分区更小来实现。检查正在设置的执行程序的数量以及为每个执行程序保留多少内存也很重要(您未将此信息放在问题上)。

我发现this guide对调整Spark非常有用。

10-06 01:02