问题描述
我将从问题开始:如何使用 Scala API 的 Iteratee
将文件上传到云存储(在我的例子中是 Azure Blob Storage,但我不不认为现在最重要)
I will start with the question: How to use Scala API's Iteratee
to upload a file to the cloud storage (Azure Blob Storage in my case, but I don't think it's most important now)
背景:
我需要将输入分成大约 1 MB 的块,以便将大型媒体文件 (300 MB+) 存储为 Azure 的 BlockBlobs
.不幸的是,我的 Scala 知识仍然很差(我的项目是基于 Java 的,其中 Scala 的唯一用途是上传控制器).
I need to chunk the input into blocks of about 1 MB for storing large media files (300 MB+) as an Azure's BlockBlobs
. Unfortunately, my Scala knowledge is still poor (my project is Java based and the only use for Scala in it will be an Upload controller).
我尝试使用以下代码:为什么调用错误或在 BodyParser 的 Iteratee 中完成请求在 Play Framework 2.0 中挂起?(作为 Input
Iteratee
) - 它运行良好,但我可以使用的每个 Element
的大小为 8192 字节,因此它太小而无法发送数百兆字节的文件到云端.
I tried with this code: Why makes calling error or done in a BodyParser's Iteratee the request hang in Play Framework 2.0? (as a Input
Iteratee
) - it works quite well but eachElement
that I could use has size of 8192 bytes, so it's too small for sending some hundred megabyte files to the cloud.
我必须说这对我来说是一种全新的方法,很可能我误解了一些东西(不想说我误解了一切;>)
I must say that's quite a new approach to me, and most probably I misunderstood something (don't want to tell that I misunderstood everything ;> )
我将感谢任何提示或链接,这将有助于我解决该主题.如果有任何类似用法的样本,这将是我了解这个想法的最佳选择.
I will appreciate any hint or link, which will help me with that topic. If is there any sample of similar usage it would be the best option for me to get the idea.
推荐答案
基本上你首先需要的是将输入重新分块为更大的块,1024 * 1024 字节.
Basically what you need first is rechunk input as bigger chunks, 1024 * 1024 bytes.
首先让我们有一个 Iteratee
,它将消耗多达 1m 的字节(可以让最后一个块更小)
First let's have an Iteratee
that will consume up to 1m of bytes (ok to have the last chunk smaller)
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
使用它,我们可以构建一个 Enumeratee
(适配器),使用名为 grouped 的 API 重新组合块:
Using that, we can construct an Enumeratee
(adapter) that will regroup chunks, using an API called grouped:
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
此处 grouped 使用 Iteratee
来确定在每个块中放入多少.为此,它使用了我们的消耗 AMB.这意味着结果是一个 Enumeratee
将输入重新分块到 1MB 的 Array[Byte]
中.
Here grouped uses an Iteratee
to determine how much to put in each chunk. It uses the our consumeAMB for that. Which means the result is an Enumeratee
that rechunks input into Array[Byte]
of 1MB.
现在我们需要编写BodyParser
,它将使用Iteratee.foldM
方法来发送每个字节块:
Now we need to write the BodyParser
, which will use the Iteratee.foldM
method to send each chunk of bytes:
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) =>
// write bytes and return next handle, probable in a Future
}
foldM 传递一个状态并在其传递的函数中使用它 (S,Input[Array[Byte]]) =>Future[S]
返回一个新的状态未来.在 Future
完成并且有可用的输入块之前,foldM 不会再次调用该函数.
foldM passes a state along and uses it in its passed function (S,Input[Array[Byte]]) => Future[S]
to return a new Future of state. foldM will not call the function again until the Future
is completed and there is an available chunk of input.
正文解析器将重新分块输入并将其推送到存储中:
And the body parser will be rechunking input and pushing it into the store:
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
Returning a Right 表示你在正文解析结束时返回一个正文(恰好是这里的处理程序).
Returning a Right indicates that you are returning a body by the end of the body parsing (which happens to be the handler here).
这篇关于Play 2.x : 使用 Iteratees 进行响应式文件上传的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!