我正在使用一个特定的数据库,如果成功
查询,您可以使用来访问一组结果数据
具体命令:
getResultData :: IO (ResponseCode, ByteString)
现在,getResultData将返回响应代码和一些数据,其中
响应代码如下所示:
response = GET_DATA_FAILED | OPERATION_SUCCEEDED | NO_MORE_DATA
ByteString是一个,一些或所有块:
Data http://desmond.imageshack.us/Himg189/scaled.php?server=189&filename=chunksjpeg.png&res=medium
故事还没有结束。存在一组群组:
Stream http://desmond.imageshack.us/Himg695/scaled.php?server=695&filename=chunkgroupsjpeg.png&res=medium
收到getResultData的NO_MORE_DATA响应后,将调用
getNextItem将迭代流,使我可以开始调用
再次获取结果。一旦getNextItem返回STREAM_FINISHED,即
她写的所有;我有我的数据。
现在,我希望使用Date.Iteratee或Data.Enumerator来重塑此现象。因为我的
现有的Data.Iteratee解决方案有效,但似乎还很幼稚,我觉得我应该对此建模
嵌套的迭代器,而不是一个大的迭代器blob,这就是
我的解决方案目前已实施。
我一直在看Data.Iteratee 0.8.6.2的代码,我有点困惑
当涉及到嵌套的东西时。
嵌套迭代是否正确?如果是这样,将如何使用嵌套的迭代器对此建模?
问候
最佳答案
我认为嵌套的迭代器是正确的方法,但是这种情况有一些独特的问题,使其与大多数常见示例略有不同。
块和组
第一个问题是正确获取数据源。基本上,您所描述的逻辑划分将为您提供与[[ByteString]]
等效的流。如果创建一个枚举器来直接产生该枚举器,则流中的每个元素将是一整组块,大概是出于内存原因,您希望避免这种情况。您可以将所有内容拼合为一个[ByteString]
,但是随后您需要重新引入边界,这将非常浪费,因为数据库正在为您执行此操作。
现在忽略组的流,您似乎需要自己将数据分成多个块。我将其建模为:
enumGroup :: Enumerator ByteString IO a
enumGroup = enumFromCallback cb ()
where
cb () = do
(code, data) <- getResultData
case code of
OPERATION_SUCCEEDED -> return $ Right ((True, ()), data)
NO_MORE_DATA -> return $ Right ((False, ()), data)
GET_DATA_FAILED -> return $ Left MyException
由于块大小是固定的,因此您可以使用
Data.Iteratee.group
轻松地对其进行块化。enumGroupChunked :: Iteratee [ByteString] IO a -> IO (Iteratee ByteString IO a)
enumGroupChunked = enumGroup . joinI . group groupSize
比较此类型与
Enumerator
type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)
因此,
enumGroupChunked
基本上是更改流类型的奇特枚举器。这意味着它需要一个[ByteString] iteratee使用者,并返回一个使用普通字节串的iteratee。通常,枚举数的返回类型无关紧要;它只是一个迭代器,您可以使用run
(或tryRun
)进行评估以获取输出,因此您可以在此处执行相同的操作:evalGroupChunked :: Iteratee [ByteString] IO a -> IO a
evalGroupChunked i = enumGroupChunked i >>= run
如果您要对每个组执行更复杂的处理,则最简单的方法是在
enumGroupChunked
函数中。组流
现在,这已经不成问题了,如何处理小组流?答案取决于您要如何消费它们。如果您想本质上独立对待流中的每个组,我将执行以下操作:
foldStream :: Iteratee [ByteString] IO a -> (b -> a -> b) -> b -> IO b
foldStream iter f acc0 = do
val <- evalGroupChunked iter
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> foldStream iter f $! f acc0 val
NO_MORE_DATA -> return $ f acc0 val
GET_DATA_FAILED -> error "had a problem"
但是,假设您要对整个数据集进行某种流处理,而不仅仅是单个组。也就是说,你有一个
bigProc :: Iteratee [ByteString] IO a
您想在整个数据集上运行。这是枚举器的返回迭代器有用的地方。现在,一些先前的代码会稍有不同:
enumGroupChunked' :: Iteratee [ByteString] IO a
-> IO (Iteratee ByteString IO (Iteratee [ByteString] IO a))
enumGroupChunked' = enumGroup . group groupSize
procStream :: Iteratee [ByteString] IO a -> a
procStream iter = do
i' <- enumGroupChunked' iter >>= run
res <- getNextItem
case res of
OPERATION_SUCCEEDED -> procStream i'
NO_MORE_DATA -> run i'
GET_DATA_FAILED -> error "had a problem"
嵌套迭代器(即
Iteratee s1 m (Iteratee s2 m a)
)的这种用法很少见,但是当您要顺序处理来自多个枚举器的数据时,它特别有用。关键是要认识到,对外部iteratee进行抄送将为您提供准备接收更多数据的iteratee。在这种情况下,此模型非常有效,因为您可以独立枚举每个组,但可以将它们作为单个流进行处理。一个警告:内部迭代器将处于它所处于的任何状态。假设一个组的最后一个块可能小于完整的块,例如
Group A Group B Group C
1024, 1024, 512 1024, 1024, 1024 1024, 1024, 1024
在这种情况下将发生的事情是,由于
run
将数据合并为大小为1024的块,因此它将把组A的最后一块与组B的前512个字节合并。这对于示例,因为该代码终止了内部迭代器(带有group
)。这意味着这些组是真正独立的,因此您必须这样对待它们。如果要像foldStream
中那样组合组,则必须考虑整个流。如果是这种情况,那么您将需要使用比joinI
更复杂的工具。Data.Iteratee与Data.Enumerator
在不讨论任何一个软件包的优劣的情况下,更不用说IterIO(我很偏颇),我想指出我认为两者之间最重要的区别:流的抽象。
在Data.Iteratee中,使用者
procStream
在某种长度的名义ByteString上进行操作,一次可以访问单个group
块。在Data.Enumerator中,使用者
Iteratee ByteString m a
对概念[ByteString]进行操作,一次可访问一个或多个元素(字节字符串)。这意味着大多数Data.Iteratee操作都以元素为中心,即使用
ByteString
时,它们将在单个Iteratee ByteString m a
上进行操作,而Data.Enumerator操作是针对块的,在Iteratee ByteString
上进行操作。您可以想到
Word8
=== ByteString
。关于haskell - 嵌套的迭代器,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7560944/