我正在使用使用迭代器和枚举器的 playframework 的异步 I/O 库。我现在有一个 Iterator[T] 作为数据接收器(为了简单起见,它是一个 Iterator[Byte],它将其内容存储到一个文件中)。这个 Iterator[Byte] 被传递给处理写入的函数。

但在写入之前,我想在文件开头添加一些统计信息(为简化起见,它是一个字节),因此我在将迭代器传递给 write 函数之前按以下方式传输它:

def write(value: Byte, output: Iteratee[Byte]): Iteratee[Byte] =
    Iteratee.flatten(output.feed(Input.El(value)))

当我现在从磁盘读取存储的文件时,我得到了一个 Enumerator[Byte]。
起初我想读取并删除附加数据,然后我想将 Enumerator[Byte] 的其余部分传递给处理读取的函数。
所以我还需要转换枚举器:
def read(input: Enumerator[Byte]): (Byte, Enumerator[Byte]) = {
   val firstEnumeratorEntry = ...
   val remainingEnumerator = ...
   (firstEnumeratorEntry, remainingEnumerator)
}

但我不知道如何做到这一点。如何从 Enumerator 读取一些字节并获取剩余的 Enumerator?

将 Iteratee[Byte] 替换为 OutputStream 并将 Enumerator[Byte] 替换为 InputStream,这将非常简单:
def write(value: Byte, output: OutputStream) = {
    output.write(value)
    output
}
def read(input: InputStream) = (input.read,input)

但是我需要play框架的异步I/O。

最佳答案

这是通过在 Iteratee 和适当的(种类)状态累加器(此处为元组)中折叠来实现此目的的一种方法

我去阅读 routes 文件,第一个字节将被读取为 Char,另一个字节将作为 UTF-8 字节串附加到 String

  def index = Action {
    /*let's do everything asyncly*/
    Async {
      /*for comprehension for read-friendly*/
      for (
        i <- read; /*read the file */
        (r:(Option[Char], String)) <- i.run /*"create" the related Promise and run it*/
      ) yield Ok("first : " + r._1.get + "\n" + "rest" + r._2) /* map the Promised result in a correct Request's Result*/
    }
  }


  def read = {
    //get the routes file in an Enumerator
    val file: Enumerator[Array[Byte]] = Enumerator.fromFile(Play.getFile("/conf/routes"))

    //apply the enumerator with an Iteratee that folds the data as wished
    file(Iteratee.fold((None, ""):(Option[Char], String)) { (acc, b) =>
       acc._1 match {
         /*on the first chunk*/ case None => (Some(b(0).toChar), acc._2 + new String(b.tail, Charset.forName("utf-8")))
         /*on other chunks*/ case x => (x, acc._2 + new String(b, Charset.forName("utf-8")))
       }
    })

  }

编辑

我发现了另一种使用 Enumeratee 的方法,但它需要创建 2 个 Enumerator (一个短暂的)。然而,它更优雅一些。我们使用一种“类似”的 Enumeratee,但 Traversal 比 Enumeratee(chunck 级别)工作在更精细的级别。
我们使用 take 1 将只占用 1 个字节,然后关闭流。另一方面,我们使用 drop 来简单地删除第一个字节(因为我们使用的是 Enumerator[Array[Byte]])

此外,现在 read2 的签名比您希望的更接近,因为它返回 2 个枚举器(距离 Promise、Enumerator 不远)
def index = Action {
  Async {
    val (first, rest) = read2
    val enee = Enumeratee.map[Array[Byte]] {bs => new String(bs, Charset.forName("utf-8"))}

    def useEnee(enumor:Enumerator[Array[Byte]]) = Iteratee.flatten(enumor &> enee |>> Iteratee.consume[String]()).run.asInstanceOf[Promise[String]]

    for {
      f <- useEnee(first);
      r <- useEnee(rest)
    } yield Ok("first : " + f + "\n" + "rest" + r)
  }
}

def read2 = {
  def create = Enumerator.fromFile(Play.getFile("/conf/routes"))

  val file: Enumerator[Array[Byte]] = create
  val file2: Enumerator[Array[Byte]] = create

  (file &> Traversable.take[Array[Byte]](1), file2 &> Traversable.drop[Array[Byte]](1))

}

关于Scala:读取 Enumerator[T] 的一些数据并返回剩余的 Enumerator[T],我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/10950267/

10-11 03:06