Как мне передать тайм-аут, который сбрасывается при каждом входящем?

Предполагается, что функция withTimeout передает ConsoleEvent сообщение CeTimeout каждые s :: Int секунды, если ничего не было получено. Вместо этого ему не удается отправить события CeTimeout в нужное время. Одно событие CeTimeout заменяется другими событиями, если прошло более s секунд с потерей исходного события. Также вместо одного CeTimeout события должно быть n*s CeTimeout событий с n подсчетом на каждый прошедший s секундный период. Где ошибка и что можно исправить? Спасибо!

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ bounded 1

        tid <- launchTimeout oTimeout >>= newMVar

        forkIO $ do
          runEffect . forever $ fromInput iKept >-> factorTimeout tid oTimeout >-> toOutput oKept

        forkIO $ do
          runEffect . forever $ fromInput iTimeout >-> toOutput oKept

        return $ do
          await >>= (liftIO . guardedSend oSent)
          (liftIO . guardedRecv $ iSent) >>= yield

    guardedSend :: Output ConsoleEvent -> ConsoleEvent -> IO ()
    guardedSend o ce =
      (atomically $ send o ce) >>= \case
        True -> return ()
        otherwise -> die $ "withTimeout can not send"

    guardedRecv :: Input ConsoleEvent -> IO ConsoleEvent
    guardedRecv i =
      (atomically $ recv i) >>= \case
        Just a -> return a
        otherwise -> die $ "withTimeout can not recv"

    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent IO ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

Вот полностью исполняемый скрипт.


person Vanson Samuel    schedule 27.09.2018    source источник
comment
Немного оффтоп: было бы неплохо, если бы вы предоставили полностью исполняемый код в виде скрипта, который включает в себя импорт как показано здесь. Благодарю вас!   -  person Konstantine Rybnikov    schedule 27.09.2018
comment
@KostiantynRybnikov полностью исполняемый код находится по этой ссылке: gist.github.com/binq/360a545cdf9b5b514f1ce420531070a6   -  person Vanson Samuel    schedule 01.10.2018
comment
Есть ли причина, по которой вы постоянно создаете новый (i, o) внутри withTimeout вместо того, чтобы отправлять тайм-ауты i, сгенерированному в main?   -  person Li-yao Xia    schedule 01.10.2018
comment
Вы снова вызываете launchTimeout сразу после завершения потока тайм-аута, но единственным потребителем является ceRecv, который собирается удалить только что отправленное factorTimeout событие. Когда второй поток тайм-аута пытается отправить тайм-аут, другой конец может быть подвергнут сборке мусора, поэтому вывод запечатывается, и вы получаете ошибку.   -  person Li-yao Xia    schedule 01.10.2018
comment
@ Li-yaoXia, я исправил свой код, чтобы исправить проблемы, на которые вы указали. Функция по-прежнему не работает, но она дает сбой по-другому. Пожалуйста, смотрите мой обновленный пост. Спасибо!   -  person Vanson Samuel    schedule 02.10.2018
comment
@Li-yaoXia Я постоянно создаю новые (i, o) внутри withTimeout, потому что хочу, чтобы он был полностью автономным. Разве это не возможно?   -  person Vanson Samuel    schedule 02.10.2018
comment
Что должно быть самодостаточным? Эта косвенность кажется лишним источником сложности, ожидающим возникновения ошибки. Действительно ли это минимизированный фрагмент приложения, в котором требуется такая архитектура? И почему обновленный код по-прежнему вызывает launchTimeout дважды за итерацию?   -  person Li-yao Xia    schedule 02.10.2018


Ответы (1)


Похоже, что Pipe разрешает только один yield на await. Это означает, что CeTimeout не может быть отправлено по каналу произвольно, потому что в канал не попало ничего, что вызвало бы поток. Мне придется просмотреть источник, чтобы подтвердить это; тем временем эта функция была реорганизована, чтобы возвращать Pipe и Producer, а не только Pipe. Затем Producer можно снова соединить в вызывающей функции. Первоначальный план состоял в том, чтобы вернуть только Pipe, чтобы вызывающей функции не приходилось выполнять какую-либо дополнительную работу, чтобы тайм-ауты работали. Это было бы более самодостаточным решением. Этот вариант хорош тем, что он более явный. Тайм-ауты не будут выглядеть из воздуха для кого-то, кто не знаком с конвейером.

withTimeout :: (MonadIO t) => Int -> IO (Pipe ConsoleEvent ConsoleEvent t (), Producer ConsoleEvent t ())
withTimeout ((* 1000000) -> s) =
  do
    (oTimeout, iTimeout) <- spawn $ bounded 1
    vTid <- launchTimeout oTimeout >>= newMVar

    return (factorTimeout vTid oTimeout, fromInput iTimeout)
  where
    launchTimeout :: Output ConsoleEvent -> IO ThreadId
    launchTimeout o =
      forkIO . forever $ do
        threadDelay $ s
        (atomically $ send o CeTimeout) >>= \case
          True -> return ()
          otherwise -> die "withTimeout can not send timeout"

    relaunchTimeout :: Output ConsoleEvent -> ThreadId -> IO ThreadId
    relaunchTimeout o oldTid = 
      do
        tid <- launchTimeout o
        killThread oldTid
        return tid

    factorTimeout :: (MonadIO t) => MVar ThreadId -> Output ConsoleEvent -> Pipe ConsoleEvent ConsoleEvent t ()
    factorTimeout v o =
      do
        ce <- await
        liftIO . modifyMVar_ v $ relaunchTimeout o
        yield ce

main :: IO ()
main =
  do
    hSetBuffering stdin NoBuffering
    hSetEcho stdin False

    exitSemaphore <- newEmptyMVar
    (o1, i1) <- spawn $ bounded 1
    (o2, i2) <- spawn $ bounded 1

    (timeoutTrap, timeoutRender) <- withTimeout 2

    runEffect $ yield CeBegan >-> toOutput o1

    forkIO $ do
      runEffect . forever $ chars >-> toOutput o1
      putMVar exitSemaphore ()

    -- other inputs would be piped to o1 here

    forkIO $ do
      runEffect . forever $ fromInput i1 >-> timeoutTrap >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      runEffect . forever $ timeoutRender >-> toOutput o2
      putMVar exitSemaphore ()

    forkIO $ do
      -- logic would be done before dumpPipe
      runEffect . forever $ fromInput i2 >-> dumpPipe >-> (await >> return ())
      putMVar exitSemaphore ()

    takeMVar exitSemaphore

Вот полностью исполняемый скрипт.

person Vanson Samuel    schedule 08.10.2018