我构建了一个 Spark cluster
。
worker :2
核心数:12
内存:总计 32.0 GB,已使用 20.0 GB
每个 worker 获得 1 个 CPU、6 个内核和 10.0 GB 内存
我的程序从 MongoDB cluster
获取数据源。 Spark
和 MongoDB cluster
在同一个 LAN
(1000Mbps) 中。MongoDB document
格式:{name:string, value:double, time:ISODate}
大约有 1300 万份文件。
我想从包含 60 个文档的特殊小时中获取特殊 name
的平均值。
这是我的关键功能
/*
*rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
*/
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={
val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
if(timeRangeRdd4.isEmpty()){
return basicBSONRDD(name, time)
}
else{
return timeRangeRdd4.map(tuple => {
val bson = new BasicBSONObject()
bson.put("name", tuple._1)
bson.put("value", tuple._2/60)
bson.put("time", time)
bson })
}
}
这是
Job
信息的一部分我的程序运行得很慢。是因为
isEmpty
和 reduceByKey
吗?如果是,我该如何改进?如果不是,为什么?======更新===
timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)
在 34 线上
我知道 reduceByKey 是一个全局操作,可能会花费很多时间,但是,它所花费的成本超出了我的预算。我该如何改进它或者它是Spark的缺陷。同样的计算和硬件,如果我使用java的多线程只需要几秒钟。
最佳答案
首先,isEmpty
只是 RDD 阶段结束的点。 map
s 和 filter
s 不需要shuffle,UI 中使用的方法始终是触发阶段更改/shuffle 的方法……在这种情况下是 isEmpty
。从这个角度来看,为什么它运行缓慢并不容易辨别,尤其是没有看到原始 RDD
的组成。我可以告诉你 isEmpty
首先检查 partition
大小,然后执行 take(1)
并验证是否返回了数据。因此,很可能网络中存在瓶颈或其他障碍。它甚至可能是一个 GC 问题...点击进入 isEmpty
,看看你还能从那里看出什么。
关于apache-spark - Spark RDD.isEmpty 花费大量时间,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31737514/