кабелепровод и розетки: разрешить несколько подключений

Вот некоторый код, реализующий небольшой принимающий сервер с использованием conduit, network-conduit и stm-conduit. Он получает данные в сокете, а затем передает их через STM-канал в основной поток.

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class

import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)

import System.Directory (removeFile)
import System.IO

type BSChan = TBMChan ByteString

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
      loop = do
        (conn, _) <- accept soc
        sourceSocket conn $$ sinkTBMChan chan
        close conn
        loop

main :: IO ()
main = do
  soc <- socket AF_UNIX Stream 0
  bind soc (SockAddrUnix "mysock")
  socChan <- listenSocket soc 8
  sourceTBMChan socChan $$ DCB.sinkHandle stdout
  removeFile "mysock"

(В реальном приложении поток данных из сокета объединяется с некоторыми другими, поэтому я не обрабатываю его напрямую в слушателе).

Проблема в том, что там, где я ожидал, что это останется открытым до тех пор, пока основной поток не будет убит, вместо этого он завершается после получения первого сообщения в сокете. Я не могу понять, почему он это делает, если только приемник (на 2-й и последней строке) не завершает работу, как только видит конец первого потока данных. Могу ли я убедить его не делать этого? В Conduit есть кое-что о том, как сделать источник возобновляемым, но не приемник.


person Impredicative    schedule 06.01.2014    source источник
comment
Для будущих вопросов, пожалуйста, включите все импорты, чтобы ваш код действительно компилировался. Облегчает тестирование решений.   -  person shang    schedule 06.01.2014
comment
Небольшой комментарий, не связанный с аспектом канала здесь: реализация здесь заставит соединения приниматься по одному, вместо того, чтобы иметь отдельный рабочий поток, выделенный для каждого входящего соединения. Это намеренно?   -  person Michael Snoyman    schedule 07.01.2014
comment
@shang - справедливое замечание, я обновил импорт. Хотел добавить суть, а затем ссылку на нее, но я забыл об этом!   -  person Impredicative    schedule 07.01.2014
comment
@MichaelSnoyman Это правильно. Я хочу убедиться, что вещи записываются в той последовательности, в которой выполняются соединения. В общем, в любом случае не будет более одного соединения одновременно.   -  person Impredicative    schedule 07.01.2014


Ответы (4)


Из документации sinkTBMChan :

Когда раковина закрыта, канал тоже закрывается.

Таким образом, когда первый дескриптор сокета закрывается, это приводит к закрытию Source из sourceSocket, закрывая подключенный приемник, который, в свою очередь, закрывает TBMChan, который распространяется на sinkHandle, останавливая приемник.

Самый простой способ решить эту проблему, вероятно, состоит в том, чтобы изменить ваш loop на собственный источник, который не закрывается между подключениями, и подключить этот источник к TBMChan.

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ do
      listen soc 2
      loop $$ sinkTBMChan chan

    loop = do
      (conn, _) <- liftIO $ accept soc
      sourceSocket conn
      liftIO $ close conn
      loop
person shang    schedule 06.01.2014
comment
Да, это в значительной степени то, что я сделал (см. ниже). Я полностью удалил network-conduit и только что реализовал источник, который не закрывает соединение. - person Impredicative; 07.01.2014

Координация отключения писателей и читателей с канала — нетривиальная проблема, но для ее решения можно повторно использовать решение из экосистемы pipes, то есть использовать библиотеку pipes-concurrency. Эта библиотека предоставляет несколько pipes-независимых утилит, которые вы можете повторно использовать с conduit библиотеками для связи между читателями и писателями, чтобы каждая сторона автоматически правильно знала, когда нужно очистить, и вы также можете вручную очистить любую сторону.

Ключевая функция, которую вы используете из библиотеки pipes-concurrency, называется spawn. Его тип:

spawn :: Buffer a -> IO (Output a, Input a)

Buffer указывает, какую базовую абстракцию канала STM использовать. Судя по вашему примеру кода, похоже, вам нужен буфер Bounded:

spawn (Bounded 8) :: IO (Output a, Input a)

В этом случае a может быть любым, поэтому это может быть ByteString, например:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)

Input и Output ведут себя как почтовый ящик. Вы добавляете сообщения в почтовый ящик путем sendпереноса данных в Outputs, а вы извлекаете сообщения из почтового ящика (в порядке FIFO) путем recvпереноса данных из Inputs:

-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool

-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)

Отличительной особенностью pipes-concurrency является то, что он позволяет сборщику мусора автоматически запечатывать почтовый ящик, если в почтовый ящик нет ни читателей, ни писателей. Это позволяет избежать общего источника взаимоблокировок.

Если бы вы использовали экосистему pipes, вы обычно использовали бы следующие две высокоуровневые утилиты для чтения и записи в почтовый ящик.

-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()

-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()

Однако, поскольку базовый механизм pipes независим, вы можете переписать эквивалентные conduit версии этих функций:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent

toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)

fromInput' :: Input a -> Source IO a
fromInput' i = do
    ma <- lift $ atomically $ recv i
    case ma of
        Nothing -> return ()
        Just a  -> do
            yield a
            fromInput' i

Тогда ваша основная функция будет выглядеть примерно так:

main :: IO ()
main = do
    soc <- socket AF_UNIX Stream 0
    bind soc (SockAddrUnix "mysock")
    (output, input) <- spawn (Bounded 8)
    forkIO $ readFromSocket soc $$ toOutput output
    fromInput input $$ DCB.sinkHandle stdout
  removeFile "mysock"

... где readFromSocket будет некоторым Source, который читается из вашего Socket.

Затем вы можете свободно писать в output, используя также другие источники данных, и не беспокоиться о необходимости их координации или правильной утилизации input или output, когда вы закончите.

Чтобы узнать больше о pipes-concurrency, я рекомендую прочитать официальный учебник.

person Gabriel Gonzalez    schedule 07.01.2014
comment
Спасибо за это, похоже, это потенциально лучший способ заниматься вещами. На данный момент мне удалось решить проблему, но чуть позже я покопаюсь в учебнике. - person Impredicative; 07.01.2014

Я думаю, что ответ @shang правильный, я бы пошел немного дальше и сказал, что поведение writeTBMChan выглядит здесь лучшим виновником. Я бы рекомендовал изменить его, чтобы автоматически не закрывать файл TBMChan. Простая реализация этой идеи:

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan

Если вы используете это в своей программе, она будет работать так, как ожидалось.

person Michael Snoyman    schedule 07.01.2014

Итак, вот один ответ, который не включает создание возобновляемой раковины. sourceSocket в network-conduit позволяет одно соединение, но мы можем реализовать поведение повторного подключения внутри sourceSocket (извините за код, я думаю, что его нужно почистить, но, по крайней мере, он работает!):

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString
sourceSocket sock =
    loop
  where
    loop = do
      (conn, _) <- lift . liftIO $ accept sock
      loop' conn
      lift . liftIO $ close conn
      loop
    loop' conn = do
      bs <- lift . liftIO $ recv conn 4096
      if B.null bs
        then return ()
        else yield bs >> loop' conn

Одна проблема здесь в том, что это никогда не завершается (пока программа не умрет). Это не проблема в моем случае использования, так как сокет должен продолжать слушать всю жизнь программы.

person Impredicative    schedule 06.01.2014