我有一个写入Map和PSQ的主线程。在Map和PSQ中,我使用相同的键,以便通过查看PSQ可以找到具有O(1)复杂度的最低优先级的条目,并将其映射到Map中的值。

现在,虽然我的主线程在需要时添加/修改了Map和PSQ,但我还有第二个线程,不断地(forever $ do)查看PSQ以确定最早的密钥是在N ms之前,然后应该刷新它。

为此,两个线程都需要查看相同的可变数据。维持状态的最佳方法是什么? IOREfs是否会这样?还有什么可能的方法来解决这个问题?

此处的“一些”前Alpha代码:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
      time <- getPOSIXTime
      let q = PSQ.singleton key time
      let m = Map.singleton key messages
      return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
      let Just messages' = Map.lookup (host,port) m
          l = length . concat $ messages'
          l' = l + length newmsgs
      in
      if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
      time <- getPOSIXTime
      let q1 = PSQ.insert (a,b) time q
      let m1 = Map.insert (a,b) c m
      return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
      hostAddr <- inet_addr host
      sendAllTo s datastring (SockAddrInet port hostAddr)
      return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
      where
         m' = Map.delete (host, port) m
         q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do
      let Just m = PSQ.findMin q
      let time = (PSQ.prio m) + 0.200 --adds 200ms
      now <- getPOSIXTime
      if now < time
        then print (m1)
        --here eventually I would call the send function to flush the queue
        else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
sendrecv s q1 m1 msg = do
     let m2 = appendMsg msg key m1
         (q3, m3) = case m2 of
                   val | m2 == m1 -> deleteRec key q1 m1
                       | otherwise -> (q1, m2)
     (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
                                        return (q4, m4)) else return (q1, m2)
     when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
     return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
     s <- socket AF_INET Datagram defaultProtocol
     (q1, m1) <- newRq
     done <- newEmptyMVar
     forkIO $ loopMyQ q1 m1 done
     (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
     takeMVar done
     --print ("longer than 200ms ago")

最佳答案

您很可能希望使用MVarsTVars来维护线程之间的一致状态。 IORef不是线程安全的。

我建议使用STM(和TVars)解决此问题。您正在处理对多个数据结构的并发访问,而STM的可组合性比考虑MVars的锁定顺序要容易得多。

看完您的代码后,TVars似乎是您的最佳选择。将您的PSQ和Map包裹在两个不同的TVar中。在atomically事务中包装所有需要一致视图的代码。在大多数情况下,您的代码将“正常工作”。但是,如果有争用该锁,则原子块将仅重试,直到它起作用为止。

关于haskell - 共享的可变状态:何时使用IORefs,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/9369205/

10-12 03:49