问题

我正在使用Haskell pipes库来定义以下类型的 Pipe :

signalExhausted :: Monad m => Pipe a (Value a) m r
Value数据类型的定义如下:
data Value a = Value a | Exhausted

管道应遵守以下法律:
toList (each [] >-> signalExhausted) ==                 [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]

换句话说,该管道应等效于Pipes.Prelude.map Value,除了在处理完所有上游值之后应产生一个附加的Exhausted之外,这给下游提供了执行某些最终操作的机会。

可以定义这样的Pipe吗?

例子
> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted]

笔记

我知道 pipes-parse 库提供了drawparseForever函数。这些看起来很有用,但是我还不太清楚如何将它们组合成符合上述规范的Pipe

最佳答案

无法定义类似signalExhausted的管道,但是可以定义等效于(>-> signalExhausted)的函数。

>-> pull category的专门版本。执行是由下游代理从上游代理提取数据来驱动的。下游代理向上游发送空请求()并阻塞,直到从上游代理返回包含值的响应为止。当上游代理服务器用尽并且没有更多值可发送回时,它会发送return。您可以在 return 的定义中看到与这些示例相关的each

each = F.foldr (\a p -> yield a >> p) (return ())
-- what to do when the data's exhausted ^

下游代理需要一个值才能继续运行,但是管道库可能无法提供该值,因此下游代理永远不会再次运行。由于它再也不会运行,因此无法修改或对数据作出 react 。

有两个解决方案。最简单的方法是在上游管道上 map Value并在完成后添加yield Exhausted
import Pipes
import qualified Pipes.Prelude as P

data Value a = Value a | Exhausted
    deriving (Show)

signalExhausted p = p >-> P.map Value >> yield Exhausted

除了函数signalExhausted代替(>-> signalExhausted)之外,这正好满足您的要求。
let xs = words "hubble bubble toil and trouble"
print . P.toList . signalExhausted $ each xs

[Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]

解决此问题的更一般的方法是停止上游代理返回,而在耗尽后向下游发出信号。我在answer to a related question中演示了如何执行此操作。
import Control.Monad
import Pipes.Core

returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)

这将每个respond替换为respond . Right,并将return替换为forever . respond . left,将返回结果与响应一起发送到下游。
returnDownstream比您要寻找的更通用。我们可以演示如何使用它来重新创建signalExhaustedreturnDownstream将返回的管道转换为永不返回的管道,而是将其返回值作为LeftEither值转发到下游。
signalExhausted p = returnDownstream p >-> respondLeftOnce
respondLeftOnce是一个示例下游代理。下游代理可以区分Right中保存的常规值和Left中保存的返回值。
respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
respondLeftOnce = go
    where
        go = do
            ea <- await
            case ea of
                Right a -> yield (Value a) >> go
                Left  _ -> yield Exhausted       -- The upstream proxy is exhausted; do something else

10-06 14:27