我正在使用一个特定的数据库,如果成功
查询,您可以使用来访问一组结果数据
具体命令:

getResultData :: IO (ResponseCode, ByteString)


现在,getResultData将返回响应代码和一些数据,其中
响应代码如下所示:

response = GET_DATA_FAILED | OPERATION_SUCCEEDED | NO_MORE_DATA


ByteString是一个,一些或所有块:

Data http://desmond.imageshack.us/Himg189/scaled.php?server=189&filename=chunksjpeg.png&res=medium

故事还没有结束。存在一组群组:

Stream http://desmond.imageshack.us/Himg695/scaled.php?server=695&filename=chunkgroupsjpeg.png&res=medium

收到getResultData的NO_MORE_DATA响应后,将调用
getNextItem将迭代流,使我可以开始调用
再次获取结果。一旦getNextItem返回STREAM_FINISHED,即
她写的所有;我有我的数据。

现在,我希望使用Date.Iteratee或Data.Enumerator来重塑此现象。因为我的
现有的Data.Iteratee解决方案有效,但似乎还很幼稚,我觉得我应该对此建模
嵌套的迭代器,而不是一个大的迭代器blob,这就是
我的解决方案目前已实施。

我一直在看Data.Iteratee 0.8.6.2的代码,我有点困惑
当涉及到嵌套的东西时。

嵌套迭代是否正确?如果是这样,将如何使用嵌套的迭代器对此建模?

问候

最佳答案

我认为嵌套的迭代器是正确的方法,但是这种情况有一些独特的问题,使其与大多数常见示例略有不同。

块和组

第一个问题是正确获取数据源。基本上,您所描述的逻辑划分将为您提供与[[ByteString]]等效的流。如果创建一个枚举器来直接产生该枚举器,则流中的每个元素将是一整组块,大概是出于内存原因,您希望避免这种情况。您可以将所有内容拼合为一个[ByteString],但是随后您需要重新引入边界,这将非常浪费,因为数据库正在为您执行此操作。

现在忽略组的流,您似乎需要自己将数据分成多个块。我将其建模为:

enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
 where
  cb () = do
    (code, data) <- getResultData
    case code of
        OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
        NO_MORE_DATA        -> return $ Right ((False, ()), data)
        GET_DATA_FAILED     -> return $ Left MyException


由于块大小是固定的,因此您可以使用Data.Iteratee.group轻松地对其进行块化。

enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize


比较此类型与Enumerator

type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)


因此,enumGroupChunked基本上是更改流类型的奇特枚举器。这意味着它需要一个[ByteString] iteratee使用者,并返回一个使用普通字节串的iteratee。通常,枚举数的返回类型无关紧要;它只是一个迭代器,您可以使用run(或tryRun)进行评估以获取输出,因此您可以在此处执行相同的操作:

evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run


如果您要对每个组执行更复杂的处理,则最简单的方法是在enumGroupChunked函数中。

组流

现在,这已经不成问题了,如何处理小组流?答案取决于您要如何消费它们。如果您想本质上独立对待流中的每个组,我将执行以下操作:

foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
  val <- evalGroupChunked iter
  res <- getNextItem
  case res of
        OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
        NO_MORE_DATA        -> return $ f acc0 val
        GET_DATA_FAILED     -> error "had a problem"


但是,假设您要对整个数据集进行某种流处理,而不仅仅是单个组。也就是说,你有一个

bigProc :: Iteratee [ByteString] IO a


您想在整个数据集上运行。这是枚举器的返回迭代器有用的地方。现在,一些先前的代码会稍有不同:

enumGroupChunked' :: Iteratee [ByteString] IO a
  -> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize

procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
  i' <- enumGroupChunked' iter >>= run
  res <- getNextItem
  case res of
        OPERATION_SUCCEEDED -> procStream i'
        NO_MORE_DATA        -> run i'
        GET_DATA_FAILED     -> error "had a problem"


嵌套迭代器(即Iteratee s1 m (Iteratee s2 m a))的这种用法很少见,但是当您要顺序处理来自多个枚举器的数据时,它特别有用。关键是要认识到,对外部iteratee进行抄送将为您提供准备接收更多数据的iteratee。在这种情况下,此模型非常有效,因为您可以独立枚举每个组,但可以将它们作为单个流进行处理。

一个警告:内部迭代器将处于它所处于的任何状态。假设一个组的最后一个块可能小于完整的块,例如

   Group A               Group B               Group C
   1024, 1024, 512       1024, 1024, 1024      1024, 1024, 1024


在这种情况下将发生的事情是,由于run将数据合并为大小为1024的块,因此它将把组A的最后一块与组B的前512个字节合并。这对于示例,因为该代码终止了内部迭代器(带有group)。这意味着这些组是真正独立的,因此您必须这样对待它们。如果要像foldStream中那样组合组,则必须考虑整个流。如果是这种情况,那么您将需要使用比joinI更复杂的工具。

Data.Iteratee与Data.Enumerator

在不讨论任何一个软件包的优劣的情况下,更不用说IterIO(我很偏颇),我想指出我认为两者之间最重要的区别:流的抽象。

在Data.Iteratee中,使用者procStream在某种长度的名义ByteString上进行操作,一次可以访问单个group块。

在Data.Enumerator中,使用者Iteratee ByteString m a对概念[ByteString]进行操作,一次可访问一个或多个元素(字节字符串)。

这意味着大多数Data.Iteratee操作都以元素为中心,即使用ByteString时,它们将在单个Iteratee ByteString m a上进行操作,而Data.Enumerator操作是针对块的,在Iteratee ByteString上进行操作。

您可以想到Word8 === ByteString

关于haskell - 嵌套的迭代器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7560944/

10-13 08:24