




我建立在Scala中一个播放框架应用程序,我想流的字节到S3的数组。我现在用的是播放-S3 库来做到这一点。而多部分文件上传的相关文档部分是什么与此有关:

I am building a Play Framework application in Scala where I would like to stream an array of bytes to S3. I am using the Play-S3 library to do this. The "Multipart file upload" of the documentation section is what's relevant here:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我试图做同样的事情在我的应用程序,但与 Iteratee 枚举秒。

该流和异步性让事情变得有点复杂,但这里是我迄今为止(注 uploadTicket 前面的code定义):

The streams and asynchronicity make things a little complicated, but here is what I have so far (Note uploadTicket is defined earlier in the code):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
  bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t


Everything compiles and runs without incident. In fact, "Success" gets printed out. The problem is that no file ever shows up on S3.



There might be multiple problems with your code. It's a bit unreadable caused by the map method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.


The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.


To make the code compile I added a trait and a few methods

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???


Here we create a few parts

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter, _), bytes) => (counter + 1) -> bytes

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete =
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload


Running it is done by simply connecting the parts

// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete


Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.


Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.

您可以采取,你不等待未来的一部分上传来完成的另一种方法。这将导致在你使用 Future.sequence 来上传期货序列转换为包含结果的顺序一个未来的另一步。其结果将是一个机制,在将零件亚马逊,只要你有足够的数据。

You could take another approach where you do not wait for the Future of the part upload to complete. This would result in another step where you use Future.sequence to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.



09-06 22:19