问题描述
我的问题与计算列表中每个元素的出现非常相似[Scala中的[List [T]] ],除了我想拥有一个涉及并行集合.
My question is very similar to Count occurrences of each element in a List[List[T]] in Scala, except that I would like to have an efficient solution involving parallel collections.
具体来说,我有一个大的(〜10 ^ 7)向量vec
,其中包含简短的(〜10)个整数列表,并且我想为每个整数x
获取x
发生的次数,例如作为Map[Int,Int]
.不同整数的数量级为10 ^ 6.
Specifically, I have a large (~10^7) vector vec
of short (~10) lists of Ints, and I would like to get for each Int x
the number of times x
occurs, for example as a Map[Int,Int]
. The number of distinct integers is of the order 10^6.
由于需要在计算机上完成此操作,因此具有相当大的内存(150GB)和内核数量(> 100),因此并行收集似乎是一个不错的选择.下面的代码是一种好方法吗?
Since the machine this needs to be done on has a fair amount of memory (150GB) and number of cores (>100) it seems like parallel collections would be a good choice for this. Is the code below a good approach?
val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap
还是有更好的解决方案?如果您想知道.seq转换:出于某种原因,以下代码似乎没有终止,即使是很小的例子:
Or are there better solutions? In case you are wondering about the .seq conversion: for some reason the following code doesn't seem to terminate, even for small examples:
val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap
推荐答案
这可以执行某些操作. aggregate
与fold
相似,不同之处在于您还可以组合顺序折痕的结果.
This does something. aggregate
is like fold
except you also combine the results of the sequential folds.
更新:.par.groupBy
中有开销不足为奇,但是我对常量因素感到惊讶.通过这些数字,您将永远无法以这种方式计数.另外,我不得不增加内存.
Update: It's not surprising that there is overhead in .par.groupBy
, but I was surprised by the constant factor. By these numbers, you would never count that way. Also, I had to bump the memory way up.
本文介绍了用于构建结果图的有趣技术 从概述链接. (它巧妙地保存了中间结果,然后在末尾并行合并它们.)
The interesting technique used to build the result map is described in this paper linked from the overview. (It cleverly saves the intermediate results and then coalesces them in parallel at the end.)
但是,如果您真正想要的只是计数,则复制groupBy
的中间结果会很昂贵.
But copying around the intermediate results of the groupBy
turns out to be expensive, if all you really want is a count.
数字是对连续的groupBy
,并行的和最后的aggregate
进行比较.
The numbers are comparing sequential groupBy
, parallel, and finally aggregate
.
apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
测试代码中没有什么神奇的东西.
Nothing magical in the test code.
import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable
import concurrent.duration._
trait Timed {
def now = System.nanoTime
def timed[A](op: =>A): A = {
val start = now
val res = op
val end = now
val lapsed = (end - start).nanos.toMillis
Console println s"Finished in $lapsed"
res
}
def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
Console println s"$title: Starting..."
val res = timed(op)
//val showable = res.toIterator.min //(res.toIterator take 10).toList
val showable = res.toList.sorted take 10
Console println s"$title: $showable"
}
}
它会生成一些有趣的随机数据.
It generates some random data for interest.
object Test extends App with Timed {
val upto = math.pow(10,6).toInt
val ran = new java.util.Random
val ten = (1 to 10).toList
val maxSamples = 1000
// samples of ten random numbers in the desired range
val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
// pick a sample at random
def anyten = samples(ran nextInt maxSamples)
def mag = 7
val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)
从任务中调用aggregate
的顺序操作和组合操作,并将结果分配给易失性变量.
The sequential operation and the combining operation of aggregate
are invoked from a task, and the result is assigned to a volatile var.
def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
def so(m: mutable.Map[Int,Int], is: List[Int]) = {
for (i <- is) {
val v = m.getOrElse(i, 0)
m(i) = v + 1
}
m
}
def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
for ((i, count) <- n) {
val v = m.getOrElse(i, 0)
m(i) = v + count
}
m
}
showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Aggregate", data.par.aggregate(z)(so, co))
}
这篇关于计算Scala并行集合中每个项目的出现次数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!