背景
我一直在阅读《 Functional Programming in Scala》一书,对《 Chapter 7: Purely functional parallelism》中的内容有一些疑问。
这是本书中答案的代码:Par.scala,但是我对其中的某些部分感到困惑。
这是Par.scala的代码的第一部分,代表并行性:

import java.util.concurrent._

object Par {
  type Par[A] = ExecutorService => Future[A]

  def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)

  private case class UnitFuture[A](get: A) extends Future[A] {
    def isDone = true
    def get(timeout: Long, units: TimeUnit): A = get
    def isCancelled = false
    def cancel(evenIfRunning: Boolean): Boolean = false
  }

  def map2[A, B, C](a: Par[A], b: Par[B])(f: (A, B) => C): Par[C] =
    (es: ExecutorService) => {
      val af = a(es)
      val bf = b(es)
      UnitFuture(f(af.get, bf.get))
    }

  def fork[A](a: => Par[A]): Par[A] =
    (es: ExecutorService) => es.submit(new Callable[A] {
      def call: A = a(es).get
    })

  def lazyUnit[A](a: => A): Par[A] =
    fork(unit(a))

  def run[A](es: ExecutorService)(a: Par[A]): Future[A] = a(es)

  def asyncF[A, B](f: A => B): A => Par[B] =
    a => lazyUnit(f(a))

  def map[A, B](pa: Par[A])(f: A => B): Par[B] =
    map2(pa, unit(()))((a, _) => f(a))
}


Par[A]的最简单模型可能是ExecutorService => Future[A],而run只是返回Future
unit通过返回UnitFuture将常量值提升为并行计算,这是Future的简单实现,只包装了一个常量值。
map2将两个并行计算的结果与一个二进制函数结合在一起。
fork标记用于同时评估的计算。除非经过强行运行,否则不会真正进行评估。这是它最简单,最自然的实现。即使存在问题,我们还是先将它们放在一边。
lazyUnit将其未评估的参数包装在Par中,并将其标记为并发评估。
run通过实际执行计算从Par中提取一个值。
asyncF将任何函数A => B转换为异步评估其结果的函数。


问题
fork这个函数在这里让我很困惑,因为它带有一个惰性参数,稍后将在调用它时对其进行评估。然后,我的问题更多地是关于何时应使用此fork,即何时需要惰性计算以及何时需要直接拥有值。
这是本书中的一个练习:

练习7.5
困难:编写此函数,称为序列。不需要其他原语。不要调用run。
def sequence[A](ps: List[Par[A]]): Par[List[A]]

这是答案(提供了here)。

第一
  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldRight[Par[List[A]]](unit(List()))((h, t) => map2(h, t)(_ :: _))

上面的代码和以下代码有什么区别:
  def sequence_simple[A](l: List[Par[A]]): Par[List[A]] =
    l.foldLeft[Par[List[A]]](unit(List()))((t, h) => map2(h, t)(_ :: _))


另外
  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] =
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, fork(sequenceRight(t)))(_ :: _)
    }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
    }
  }

sequenceRight中,当直接调用递归函数时使用fork。但是,在sequenceBalanced中,fork在整个函数主体之外使用。
然后,与上面的代码和下面的代码有什么不同(我们在其中切换了fork的位置):
  def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = fork {
    as match {
      case Nil => unit(Nil)
      case h :: t => map2(h, sequenceRight(t))(_ :: _)
    }
  }

  def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] =
    if (as.isEmpty) unit(Vector())
    else if (as.length == 1) map(as.head)(a => Vector(a))
    else {
      val (l,r) = as.splitAt(as.length/2)
      map2(fork(sequenceBalanced(l)), fork(sequenceBalanced(r)))(_ ++ _)
    }


最后,根据上面定义的sequence,我们具有以下功能:
  def parMap[A,B](ps: List[A])(f: A => B): Par[List[B]] = fork {
    val fbs: List[Par[B]] = ps.map(asyncF(f))
    sequence(fbs)
  }

我想知道,我是否也可以通过以下方式实现该功能,即通过应用开头定义的lazyUnit?这个实现lazyUnit(ps.map(f))懒吗?
  def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
    lazyUnit(ps.map(f))

最佳答案

我没有完全理解你的怀疑。但是我发现以下解决方案存在一个主要问题,

def parMapByLazyUnit[A, B](ps: List[A])(f: A => B): Par[List[B]] =
  lazyUnit(ps.map(f))


要了解该问题,请查看def lazyUnit

def fork[A](a: => Par[A]): Par[A] =
  (es: ExecutorService) => es.submit(new Callable[A] {
    def call: A = a(es).get
  })

def lazyUnit[A](a: => A): Par[A] =
  fork(unit(a))


所以... lazyUnit接受类型为=> A的表达式,并将其提交给ExecutorService进行求值。并将此并行计算的包装结果返回为Par[A]

parMap中,对于ps: List[A]的每个元素,我们不仅必须使用函数f: A => B评估相应的映射,而且还必须对in parallel进行这些评估。

但是我们的解决方案lazyUnit(ps.map(f))会将整个{ ps.map(f) }评估作为单个任务提交给我们的ExecutionService。这意味着我们不是并行进行的。

我们需要做的是确保对于a中的每个元素ps: [A],函数f: A => B作为我们的ExecutorService的单独任务执行。

现在,从我们的实现中学到的是,我们可以使用exp: => A获取lazyUnit(exp)来运行result: Par[A]类型的表达式。

因此,我们将为a: A中的每个ps: List[A]完全做到这一点,

val parMappedTmp = ps.map( a => lazyUnit(f(a) ) )

// or

val parMappedTmp = ps.map( a => asyncF(f)(a) )

// or

val parMappedTmp = ps.map(asyncF(f))


但是,现在我们的parMappedTmpList[Par[B]],而我们需要Par[List[B]]

因此,您将需要具有以下签名的函数以获取所需的内容,

def sequence[A](ps: List[Par[A]]): Par[List[A]]


一旦有了它,

val parMapped = sequence(parMappedTmp)

关于scala - Scala中的功能并行和懒惰,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47488031/

10-11 20:47