假定withTimeout函数通过管道将ConsoleEvent与每隔CeTimeout秒发送一次的s :: Int进行管道传输(如果未接收到任何内容)。相反,它无法在适当的时间发送CeTimeout事件。如果超过CeTimeout秒而丢失了原始事件,则将一个s事件替换为其他事件。同样,它应该是CeTimeout n*s事件,而不是一个CeTimeout事件,对于已过去的每个n第二个周期,其s都会计数。错误在哪里,将如何纠正?谢谢!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ())
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid =
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

这是一个完全可执行的script

最佳答案

似乎Pipe将只允许每个yield一个await。这意味着无法随意将CeTimeout沿管道发送,因为没有任何东西进入管道以引起流向。我将必须仔细检查消息来源以确认这一点;同时,此函数已重构为返回PipeProducer,而不仅仅是Pipe。然后可以将Producer重新加入到调用函数中。最初的计划是仅返回Pipe,以便调用函数不必执行任何其他工作即可使超时生效。那本来是一个更加自给自足的解决方案。这种选择的好处在于它更加明确。对于不熟悉管道的人来说,超时看起来像是凭空出现的。

withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
  do
    (oTimeout, iTimeout) <- spawn $ bounded 1
    vTid <- launchTimeout oTimeout >>= newMVar

    return (factorTimeout vTid oTimeout, fromInput iTimeout)
  where
    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid =
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

main :: IO ()
main =
  do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    exitSemaphore <- newEmptyMVar
    (o1, i1) <- spawn $ bounded 1
    (o2, i2) <- spawn $ bounded 1

    (timeoutTrap, timeoutRender) <- withTimeout 2

    runEffect $ yield CeBegan >-> toOutput o1

    forkIO $ do
      runEffect . forever $ chars >-> toOutput o1
      putMVar exitSemaphore ()

    -- other inputs would be piped to o1 here

    forkIO $ do
      runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      runEffect . forever $ timeoutRender >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      -- logic would be done before dumpPipe
      runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
      putMVar exitSemaphore ()

    takeMVar exitSemaphore

这是一个完全可执行的script

关于haskell - 我将如何管理超时并在每次传入时重置?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/52540672/

10-08 22:11