我们有一些东西在TChan上倾销值,然后由消费者处理这些值。但是使用者无法跟上,因此生产者在通道上倾倒了很多东西,因此我们获得了大量的内存使用,但是使用者无法跟上。如果通道队列达到某个大小或某种大小,是否有一种简单的方法让生产者阻塞,以便我们可以让生产者等待消费者赶上?

最佳答案

像约翰的回答一样,我建议您自己构建一个有限的TChan。我的代码不同,因为它:


添加抽象(使BTChan为ADT)
由于他正在读取IO中的当前大小,因此删除了角落情况。
尝试在读取时不要在TVar的大小中构建thunk(在编写时,它不那么重要,因为thunk只能是“一个深度”,下一个操作总是需要评估大小)。
现在受到黑客攻击:http://hackage.haskell.org/package/bounded-tchan


注意:坦率地说,如果我是您,我将忽略所有这些答案,而只是在他的注释中使用链接到的代码(除非事实证明是不好的代码)。我敢打赌它的功能与我在这里相同,但要多加考虑。

{-# LANGUAGE BangPatterns #-}
module BTChan
        ( BTChan
        , newBTChanIO
        , newBTChan
        , writeBTChan
        , readBTChan
        ) where

import Control.Concurrent.STM

data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar  Int)

-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
    szTV <- newTVarIO 0
    c    <- newTChanIO
    return (BTChan m c szTV)

newBTChan :: Int -> STM (BTChan a)
newBTChan m
        | m < 1 = error "BTChan's can not have a maximum <= 0!"
        | otherwise = do
        szTV <- newTVar 0
        c    <- newTChan
        return (BTChan m c szTV)

writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
        sz <- readTVar szTV
        if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x

readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
        x <- readTChan c
        sz <- readTVar szTV
        let !sz' = sz - 1
        writeTVar szTV sz'
        return x

sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV


STM程序员需要注意的一些事项:


显式调用retry将产生结果,将haskell线程置于阻塞状态,等待TVarTChan之一的状态更改,以便可以重试。这样可以避免检查IO中的值并使用yield函数。
像MVars一样,TVars也可以指代重击,通常这并不是您想要的。也许有人应该制作一个定义STVarSTChanSBTChanBTChan的黑客程序包(严格和/或有界TVar和TChans)。
实际上,有必要写newBTChanIO代替杠杆newBTChan,因为即使在new{TVar,TChan}IO之下,unsafePerformIO的实现也可以工作,而atomically却不能。


编辑:通过将TVar分成一个供读者阅读者和一个供作家使用,实际上可以使性能提高2-5倍(取决于您使用的范围),从而减少争用。使用条件验证。改进的版本0.2.1已被黑客使用。

09-25 20:27