MonadBaseControl: как поднять ThreadGroup

В пакете Thread в модуле _ 1_ есть функция forkIO:

forkIO :: ThreadGroup -> IO α -> IO (ThreadId, IO (Result α))

Я бы хотел поднять его с помощью MonadBaseControl из monad-control. Вот моя попытка:

fork :: (MonadBase IO m) => TG.ThreadGroup -> m α -> m (ThreadId, m (Result α))
fork tg action = control (\runInBase -> TG.forkIO tg (runInBase action))

и вот сообщение об ошибке:

Couldn't match type `(ThreadId, IO (Result (StM m α)))'
              with `StM m (ThreadId, m (Result α))'
Expected type: IO (StM m (ThreadId, m (Result α)))
  Actual type: IO (ThreadId, IO (Result (StM m α)))
In the return type of a call of `TG.forkIO'
In the expression: TG.forkIO tg (runInBase action)
In the first argument of `control', namely
  `(\ runInBase -> TG.forkIO tg (runInBase action))'

Что изменить, чтобы типы совпадали?


person Gracjan Polak    schedule 30.12.2014    source источник


Ответы (1)


Основная проблема - это аргумент IO a для forkIO. Чтобы выполнить форк m a действия в IO, нам понадобится способ запустить m a в IO a. Для этого мы могли бы попытаться создать класс монад, у которых есть метод runBase :: MonadBase b m => m a -> b a, но очень немногие интересные преобразователи могут это предоставить. Если мы рассмотрим, например, преобразователь StateT, он мог бы выяснить, как запустить что-то в базовой монаде с runStateT, если бы ему сначала была предоставлена ​​возможность наблюдать за его собственным состоянием.

runFork :: Monad m => StateT s m a -> StateT s m (m b)
runFork x = do
    s <- get
    return $ do
        (a, s') <- runStateT x s
        return a

Это предполагает тип runForkBase :: MonadBase b m => m a -> m (b a), который мы выберем для следующего класса типов.

{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}

import Control.Monad.Base

class (MonadBase b m) => MonadRunForkBase b m | m -> b where
    runForkBase :: m a -> m (b a)

Я добавил слово Fork к имени, чтобы подчеркнуть, что будущие изменения состояния, как правило, не будут разделяться между двумя фьючерсами. По этой причине несколько интересных преобразователей, таких как WriterT, которые могли бы предоставить runBase, дают только неинтересный runBase; они вызывают побочные эффекты, которые никогда не будут заметны.

Мы можем написать что-то вроде fork для чего угодно с ограниченной формой понижения, предоставляемой экземпляром MonadRunForkBase IO m. Я собираюсь lift нормальный forkIO из базы, а не из потоков, которые вы можете сделай так же.

{-# LANGUAGE FlexibleContexts #-}

import Control.Concurrent

forkInIO :: (MonadRunForkBase IO m) => m () -> m ThreadId
forkInIO action = runForkBase action >>= liftBase . forkIO

Экземпляры

Возникает вопрос: «Для каких трансформаторов мы можем предоставить MonadRunForkBase экземпляры»? С самого начала мы можем легко предоставить их для любой из базовых монад, имеющих MonadBase экземпляра.

import Control.Monad.Trans.Identity
import GHC.Conc.Sync (STM)

instance MonadRunForkBase [] [] where runForkBase = return 
instance MonadRunForkBase IO IO where runForkBase = return
instance MonadRunForkBase STM STM where runForkBase = return
instance MonadRunForkBase Maybe Maybe where runForkBase = return
instance MonadRunForkBase Identity Identity where runForkBase = return

Для трансформаторов, как правило, проще наращивать функциональность, как эта, шаг за шагом. Вот класс преобразователей, которые могут запускать форк в монаде, непосредственно лежащей в основе.

import Control.Monad.Trans.Class

class (MonadTrans t) => MonadTransRunFork t where
    runFork :: Monad m => t m a -> t m (m a)

Мы можем предоставить реализацию по умолчанию для полного запуска в базе

runForkBaseDefault :: (Monad (t m), MonadTransRunFork t, MonadRunForkBase b m) =>
                      t m a -> t m (b a)
runForkBaseDefault = (>>= lift . runForkBase) . runFork

Это позволяет нам завершить MonadRunForkBase экземпляр для StateT за два шага. Сначала мы воспользуемся нашим runFork сверху, чтобы создать экземпляр MonadTransRunFork.

import Control.Monad
import qualified Control.Monad.Trans.State.Lazy as State

instance MonadTransRunFork (State.StateT s) where
    runFork x = State.get >>= return . liftM fst . State.runStateT x

Затем мы будем использовать значение по умолчанию, чтобы предоставить экземпляр MonadRunForkBase.

{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}

instance (MonadRunForkBase b m) => MonadRunForkBase b (State.StateT s m) where
    runForkBase = runForkBaseDefault

Мы можем сделать то же самое для RWS

import qualified Control.Monad.Trans.RWS.Lazy as RWS

instance (Monoid w) => MonadTransRunFork (RWS.RWST r w s) where
    runFork x = do
        r <- RWS.ask
        s <- RWS.get
        return $ do 
            (a, s', w') <- RWS.runRWST x r s
            return a

instance (MonadRunForkBase b m, Monoid w) => MonadRunForkBase b (RWS.RWST r w s m) where
    runForkBase = runForkBaseDefault

MonadBaseControl

В отличие от MonadRunForkBase, который мы разработали в предыдущих двух разделах, MonadBaseControl из monad-control не учитывает предположение, что" будущие изменения состояния, как правило, не будут разделяться между двумя фьючерсами ". MonadBaseContol и control пытаются восстановить состояние из ветвления в управляющих структурах с помощью restoreM :: StM m a -> m a. Это не представляет проблемы для forkIO с базы; Использование forkIO - это пример из документации MonadBaseControl. Это будет небольшая проблема для forkIO из потоков из-за возвращенных дополнительных m (Result a).

m (Result a), который нам нужен, на самом деле будет возвращен как IO (Result (StM m a)). Мы можем избавиться от IO и заменить его на m на liftBase, оставив нам m (Result (StM m a)). Мы могли бы преобразовать StM m a в m a, который восстанавливает состояние, а затем возвращает a с restoreM, но он застревает внутри Result ~ Either SomeException. Either l - это функтор, поэтому мы можем применить restoreM внутри него, упростив тип до m (Result (m a)). Either l тоже Traversable, и для любого Traversable t мы всегда можем поменять его внутри Monad или Applicative на sequenceA :: t (f a) -> f (t a). В этом случае мы можем использовать специальное назначение mapM, которое представляет собой комбинацию fmap и sequenceA только с ограничением Monad. Это даст m (m (Result a)), а m будут сглажены вместе с помощью соединения в монаде или просто с помощью >>=. Это порождает

{-# LANGUAGE FlexibleContexts #-}

import Control.Concurrent
import Control.Concurrent.Thread
import qualified Control.Concurrent.Thread.Group as TG

import Control.Monad.Base
import Control.Monad.Trans.Control

import Data.Functor
import Data.Traversable
import Prelude hiding (mapM)

fork :: (MonadBaseControl IO m) =>
        TG.ThreadGroup -> m a -> m (ThreadId, m (Result a))
fork tg action = do
    (tid, r) <- liftBaseWith (\runInBase -> TG.forkIO tg (runInBase action))    
    return (tid, liftBase r >>= mapM restoreM)

Когда мы запускаем m (Result a) в исходном потоке, он копирует состояние из разветвленного потока в исходный поток, что может быть полезно. Если вы хотите восстановить состояние основного потока после чтения Result, вам нужно сначала его захватить. checkpoint захватит все состояние и вернет действие для его восстановления.

checkpoint :: MonadBaseControl b m => m (m ())
checkpoint = liftBaseWith (\runInBase -> runInBase (return ()))
             >>= return . restoreM

Полный пример покажет, что происходит с состоянием из двух потоков. Оба потока получают состояние, в котором произошло fork, независимо от усилий по изменению состояния в другом потоке. Когда мы ждем результата в основном потоке, состояние в основном потоке устанавливается в состояние из разветвленного потока. Мы можем вернуть состояние основного потока, запустив действие, созданное checkpoint.

import Control.Monad.State hiding (mapM)

example :: (MonadState String m, MonadBase IO m, MonadBaseControl IO m) => m ()
example = do    
    get >>= liftBase . putStrLn
    tg <- liftBase TG.new
    (_, getResult) <- fork tg (get >>= put . ("In Fork:" ++)  >> return 7)
    get >>= put . ("In Main:" ++) 
    revert <- checkpoint
    result <- getResult
    (liftBase . print) result
    get >>= liftBase . putStrLn
    revert
    get >>= liftBase . putStrLn

main = do
    runStateT example "Initial"
    return ()

Это выводит

Initial
Right 7
In Fork:Initial
In Main:Initial
person Cirdec    schedule 30.12.2014
comment
Спасибо за пояснение. Есть ли способ сделать это только с помощью MonadBaseControl, без каких-либо новых специальных классов типов? - person Gracjan Polak; 31.12.2014
comment
@GracjanPolak Я добавил раздел на MonadBaseControl. - person Cirdec; 31.12.2014
comment
Большой! Похоже, это то, что мне нужно. Можете ли вы изменить порядок разделов в своем ответе так, чтобы прямой ответ был первым, а обсуждение продолжалось позже? - person Gracjan Polak; 31.12.2014