我们有一些东西在TChan上倾销值,然后由消费者处理这些值。但是使用者无法跟上,因此生产者在通道上倾倒了很多东西,因此我们获得了大量的内存使用,但是使用者无法跟上。如果通道队列达到某个大小或某种大小,是否有一种简单的方法让生产者阻塞,以便我们可以让生产者等待消费者赶上?
最佳答案
像约翰的回答一样,我建议您自己构建一个有限的TChan。我的代码不同,因为它:
添加抽象(使BTChan
为ADT)
由于他正在读取IO中的当前大小,因此删除了角落情况。
尝试在读取时不要在TVar的大小中构建thunk(在编写时,它不那么重要,因为thunk只能是“一个深度”,下一个操作总是需要评估大小)。
现在受到黑客攻击:http://hackage.haskell.org/package/bounded-tchan
注意:坦率地说,如果我是您,我将忽略所有这些答案,而只是在他的注释中使用链接到的代码(除非事实证明是不好的代码)。我敢打赌它的功能与我在这里相同,但要多加考虑。
{-# LANGUAGE BangPatterns #-}
module BTChan
( BTChan
, newBTChanIO
, newBTChan
, writeBTChan
, readBTChan
) where
import Control.Concurrent.STM
data BTChan a = BTChan {-# UNPACK #-} !Int (TChan a) (TVar Int)
-- | `newBTChan m` make a new bounded TChan of max size `m`
newBTChanIO :: Int -> IO (BTChan a)
newBTChanIO m = do
szTV <- newTVarIO 0
c <- newTChanIO
return (BTChan m c szTV)
newBTChan :: Int -> STM (BTChan a)
newBTChan m
| m < 1 = error "BTChan's can not have a maximum <= 0!"
| otherwise = do
szTV <- newTVar 0
c <- newTChan
return (BTChan m c szTV)
writeBTChan :: BTChan a -> a -> STM ()
writeBTChan (BTChan mx c szTV) x = do
sz <- readTVar szTV
if sz >= mx then retry else writeTVar szTV (sz + 1) >> writeTChan c x
readBTChan :: BTChan a -> STM a
readBTChan (BTChan _ c szTV) = do
x <- readTChan c
sz <- readTVar szTV
let !sz' = sz - 1
writeTVar szTV sz'
return x
sizeOfBTChan :: BTChan a -> STM Int
sizeOfBTChan (BTChan _ _ sTV) = readTVar sTV
STM程序员需要注意的一些事项:
显式调用
retry
将产生结果,将haskell线程置于阻塞状态,等待TVar
或TChan
之一的状态更改,以便可以重试。这样可以避免检查IO
中的值并使用yield
函数。像MVars一样,TVars也可以指代重击,通常这并不是您想要的。也许有人应该制作一个定义
STVar
,STChan
,SBTChan
和BTChan
的黑客程序包(严格和/或有界TVar和TChans)。实际上,有必要写
newBTChanIO
代替杠杆newBTChan
,因为即使在new{TVar,TChan}IO
之下,unsafePerformIO
的实现也可以工作,而atomically
却不能。编辑:通过将TVar分成一个供读者阅读者和一个供作家使用,实际上可以使性能提高2-5倍(取决于您使用的范围),从而减少争用。使用条件验证。改进的版本0.2.1已被黑客使用。