问题描述
我有一个RDD太大而不虚假的错误一贯执行不同的语句(如SparkException阶段失败了4次,ExecutorLostFailure,HDFS文件系统关闭,最大的执行人失误达到了,舞台取消,因为SparkContext被关闭,等数。)
I have an RDD that is too large to consistently perform a distinct statement without spurious errors (e.g. SparkException stage failed 4 times, ExecutorLostFailure, HDFS Filesystem closed, Max number of executor failures reached, Stage cancelled because SparkContext was shut down, etc.)
我试图计数不同的ID在特定列,例如:
I am trying to count distinct IDs in a particular column, for example:
print(myRDD.map(a => a._2._1._2).distinct.count())
有一个简单的,一致的,洗牌密集型较少的方式来做到上面的命令中,可能使用mapPartitions,reduceByKey,flatMap或其他命令使用更少的洗牌比不同?
is there an easy, consistent, less-shuffle-intensive way to do the command above, possibly using mapPartitions, reduceByKey, flatMap, or other commands that use fewer shuffles than distinct?
又见What是导致洗牌星火转变?
推荐答案
这可能是更好的找出是否还有另外一个潜在的问题,但低于会做你想要什么......而周围的方式去做,但它听起来就像是将适合您的帐单:
It might be better to figure out if there is another underlying issue, but the below will do what you want...rather round about way to do it, but it sounds like it will fit your bill:
myRDD.map(a => (a._2._1._2, a._2._1._2))
.aggregateByKey(Set[YourType]())((agg, value) => agg + value, (agg1, agg2) => agg1 ++ agg2)
.keys
.count
甚至这似乎工作,但它是不相关联,交换。它的工作原理,由于星火的内部是如何工作的......但我可能会丢失的情况......因此,虽然简单,我不知道我相信它:
Or even this seems to work, but it isn't associative and commutative. It works due to how the internals of Spark works...but I might be missing a case...so while simpler, I'm not sure I trust it:
myRDD.map(a => (a._2._1._2, a._2._1._2))
.aggregateByKey(YourTypeDefault)((x,y)=>y, (x,y)=>x)
.keys.count
这篇关于有没有办法重写星火RDD不同的使用mapPartitions,而不是不同?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!