本文介绍了Scala 中的并行文件处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我需要并行处理给定文件夹中的文件.在 Java 中,我会创建一个 FolderReader 线程来从文件夹和一个 FileProcessor 线程池中读取文件名.FolderReader 读取文件名并将文件处理函数(Runnable)提交给池执行器.

Suppose I need to process files in a given folder in parallel. In Java I would create a FolderReader thread to read file names from the folder and a pool of FileProcessor threads. FolderReader reads file names and submits the file processing function (Runnable) to the pool executor.

在 Scala 中,我看到两个选项:

In Scala I see two options:

  • 创建一个 FileProcessor 演员池,并使用 Actors.Scheduler 安排文件处理功能.
  • 在读取文件名的同时为每个文件名创建一个actor.
  • create a pool of FileProcessor actors and schedule a file processing function with Actors.Scheduler.
  • create an actor for each file name while reading the file names.

有意义吗?什么是最好的选择?

Does it make sense? What is the best option?

推荐答案

我建议尽我所能远离线程.幸运的是,我们有更好的抽象来处理下面发生的事情,在我看来,你不需要使用演员(虽然你可以)但你可以使用更简单的抽象,称为期货.它们是 Akka 开源库的一部分,我认为将来也会成为 Scala 标准库的一部分.

I suggest with all my energies to keep as far as you can from the threads. Luckily we have better abstractions which take care of what's happening below, and in your case it appears to me that you do not need to use actors (while you can) but you can use a simpler abstraction, called Futures. They are a part of Akka open source library, and I think in the future will be a part of the Scala standard library as well.

A Future[T] 只是将来会返回 T 的东西.

运行 future 所需的只是拥有一个隐式的 ExecutionContext,您可以从 Java 执行程序服务派生它.然后,您将能够享受优雅的 API 以及未来是将集合转换为期货集合、收集结果等的单子这一事实.我建议你看看 http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

All you need to run a future, is to have an implicit ExecutionContext, which you can derive from a java executor service. Then you will be able to enjoy the elegant API and the fact that a future is a monad to transform collections into collections of futures, collect the result and so on. I suggest you to give a look to http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

这里发生了很多事情:

  • 我正在使用 Future.traverse 作为第一个参数接收 M[T] 和第二个参数 T=>Future[T] 或者如果您更喜欢 Function1[T,Future[T]] 并返回 Future[M[T]]
  • 我正在使用 Future.apply 方法创建一个类型为 Future[T]
  • 的匿名类
  • I am using Future.traverse which receives as a first parameter which is M[T]<:Traversable[T] and as second parameter a T => Future[T] or if you prefer a Function1[T,Future[T]] and returns Future[M[T]]
  • I am using the Future.apply method to create an anonymous class of type Future[T]

关注 Akka 期货还有许多其他原因.

There are many other reasons to look at Akka futures.

  • Future 可以被映射,因为它们是 monad,即你可以链接 Futures 执行:

  • Futures can be mapped because they are monad, i.e. you can chain Futures execution :

Future { 3 }.map { _ * 2 }.map { _.toString }

Future 有回调函数:future.onComplete、onSuccess、onFailure、andThen 等

Futures have callback: future.onComplete, onSuccess, onFailure, andThen etc.

Futures 不仅支持遍历,还支持理解

Futures support not only traverse, but also for comprehension

这篇关于Scala 中的并行文件处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-17 21:43
查看更多