背景
我一直在阅读《 Functional Programming in Scala》一书,对《 Chapter 7: Purely functional parallelism》中的内容有一些疑问。
这是本书中答案的代码:Par.scala,但是我对其中的某些部分感到困惑。
这是Par.scala
的代码的第一部分,代表并行性:
import java.util.concurrent._
object Par {
type Par[A] = ExecutorService => Future[A]
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit): A = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get))
}
def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) => es.submit(new Callable[A] {
def call: A = a(es).get
})
def lazyUnit[A](a: => A): Par[A] =
fork(unit(a))
def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)
def asyncF[A, B](f: A => B): A => Par[B] =
a => lazyUnit(f(a))
def map[A, B](pa: Par[A])(f: A => B): Par[B] =
map2(pa, unit(()))((a, _) => f(a))
}
Par[A]
的最简单模型可能是ExecutorService => Future[A]
,而run
只是返回Future
。unit
通过返回UnitFuture
将常量值提升为并行计算,这是Future
的简单实现,只包装了一个常量值。map2
将两个并行计算的结果与一个二进制函数结合在一起。fork
标记用于同时评估的计算。除非经过强行运行,否则不会真正进行评估。这是它最简单,最自然的实现。即使存在问题,我们还是先将它们放在一边。lazyUnit
将其未评估的参数包装在Par
中,并将其标记为并发评估。run
通过实际执行计算从Par
中提取一个值。asyncF
将任何函数A => B
转换为异步评估其结果的函数。问题
fork
这个函数在这里让我很困惑,因为它带有一个惰性参数,稍后将在调用它时对其进行评估。然后,我的问题更多地是关于何时应使用此fork
,即何时需要惰性计算以及何时需要直接拥有值。这是本书中的一个练习:
练习7.5
困难:编写此函数,称为序列。不需要其他原语。不要调用run。
def sequence[A](ps: List[Par[A]]): Par[List[A]]
这是答案(提供了here)。
第一
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
l.foldRight[Par[List[A]]](unit(List()))((h, t) => map2(h, t)(_ :: _))
上面的代码和以下代码有什么区别:
def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
l.foldLeft[Par[List[A]]](unit(List()))((t, h) => map2(h, t)(_ :: _))
另外
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
as match {
case Nil => unit(Nil)
case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
}
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l,r) = as.splitAt(as.length/2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
在
sequenceRight
中,当直接调用递归函数时使用fork
。但是,在sequenceBalanced
中,fork
在整个函数主体之外使用。然后,与上面的代码和下面的代码有什么不同(我们在其中切换了
fork
的位置): def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = fork {
as match {
case Nil => unit(Nil)
case h :: t => map2(h, sequenceRight(t))(_ :: _)
}
}
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l,r) = as.splitAt(as.length/2)
map2(fork(sequenceBalanced(l)), fork(sequenceBalanced(r)))(_ ++ _)
}
最后,根据上面定义的
sequence
,我们具有以下功能: def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
val fbs: List[Par[B]] = ps.map(asyncF(f))
sequence(fbs)
}
我想知道,我是否也可以通过以下方式实现该功能,即通过应用开头定义的
lazyUnit
?这个实现lazyUnit(ps.map(f))
懒吗? def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
lazyUnit(ps.map(f))
最佳答案
我没有完全理解你的怀疑。但是我发现以下解决方案存在一个主要问题,
def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
lazyUnit(ps.map(f))
要了解该问题,请查看
def lazyUnit
,def fork[A](a: => Par[A]): Par[A] =
(es: ExecutorService) => es.submit(new Callable[A] {
def call: A = a(es).get
})
def lazyUnit[A](a: => A): Par[A] =
fork(unit(a))
所以...
lazyUnit
接受类型为=> A
的表达式,并将其提交给ExecutorService
进行求值。并将此并行计算的包装结果返回为Par[A]
。在
parMap
中,对于ps: List[A]
的每个元素,我们不仅必须使用函数f: A => B
评估相应的映射,而且还必须对in parallel
进行这些评估。但是我们的解决方案
lazyUnit(ps.map(f))
会将整个{ ps.map(f) }
评估作为单个任务提交给我们的ExecutionService
。这意味着我们不是并行进行的。我们需要做的是确保对于
a
中的每个元素ps: [A]
,函数f: A => B
作为我们的ExecutorService
的单独任务执行。现在,从我们的实现中学到的是,我们可以使用
exp: => A
获取lazyUnit(exp)
来运行result: Par[A]
类型的表达式。因此,我们将为
a: A
中的每个ps: List[A]
完全做到这一点,val parMappedTmp = ps.map( a => lazyUnit(f(a) ) )
// or
val parMappedTmp = ps.map( a => asyncF(f)(a) )
// or
val parMappedTmp = ps.map(asyncF(f))
但是,现在我们的
parMappedTmp
是List[Par[B]]
,而我们需要Par[List[B]]
因此,您将需要具有以下签名的函数以获取所需的内容,
def sequence[A](ps: List[Par[A]]): Par[List[A]]
一旦有了它,
val parMapped = sequence(parMappedTmp)
关于scala - Scala中的功能并行和懒惰,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47488031/