Как моделировать вложенные потоки с каналами?

Допустим, нам нужно просуммировать группы чисел, хранящиеся в файле, где группы разделены пустой строкой. Так

1
2
3

4
5

должен привести к 6 9.

Кажется естественным смоделировать это с помощью вложенных каналов: внешний канал будет разбивать линии на группы линий, где каждая группа сама будет источником.

Однако я не вижу, чтобы этот стиль поддерживался непосредственно в кабелепроводе. Каков наиболее идиоматический способ сделать это с трубопроводами?


person Roman Cheplyaka    schedule 05.10.2015    source источник
comment
Группа pipes-group и streaming это позволяют, но не знаю, как это сделать с каналом.   -  person danidiaz    schedule 05.10.2015
comment
Является ли целью использование вложенных каналов, чтобы сделать это паралелизуемым? Потому что вы должны иметь возможность использовать один канал для разделения на двойную новую строку, а затем суммировать группы.   -  person dsemi    schedule 06.10.2015


Ответы (1)


Есть два основных подхода, которые я бы рекомендовал для этого. Самый простой — просто получить поток типов суммы, указывающий «другое значение» вместо «новой строки». Другой подход заключается в использовании комбинаторного подхода, такого как withLine или foldLines. Последний может быть построен поверх первого. Первоначально этот подход обсуждался в записи блога на foldLines.

Ниже приведен полный фрагмент кода, показывающий тип суммы и подход к комбинатору. Обратите внимание, что folded (вероятно, неудачное название для этого) — это функция общего назначения для преобразования во «вложенный поток» из типа суммы. Подход, используемый каналами (на основе FreeT IIRC), вероятно, может использоваться и каналом, но, как я упоминал в своем блоге, я думаю, что это более сложное решение, чем необходимо.

{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
import ClassyPrelude.Conduit
import Data.Text.Read (decimal, signed)

sumType :: Monad m => Conduit Text m (Maybe Int)
sumType =
    linesUnboundedC =$= mapMC toMaybeInt
  where
    toMaybeInt "" = return Nothing
    toMaybeInt t =
        case signed decimal t of
            Right (i, "") -> return $ Just i
            _ -> fail $ "Invalid int: " ++ show t

folded :: Monad m => Sink a m () -> Sink (Maybe a) m ()
folded perGroup =
    startGroup
  where
    startGroup = peekC >>= maybe (return ()) (const go)

    go = takeMaybes =$ (perGroup >> sinkNull) >> startGroup

    takeMaybes = await >>= maybe (return ()) (\x ->
        case x of
            Nothing -> return ()
            Just y -> yield y >> takeMaybes)

main :: IO ()
main = do
    let src = yield "1\n2\n3\n\n5\n6\n\n\n7"
    src $$ sumType =$ (sinkList >>= print)
    src $$ sumType =$ folded (sinkList >>= print)
person Michael Snoyman    schedule 06.10.2015
comment
Спасибо. Это близко к тому, что я имел в виду, за исключением того, что 1) я ожидал бы более прямой поддержки от библиотеки для этого, и 2) это выглядит как ненужный CPS; разве Sink a m () -> Sink (Maybe a) m () не следует называть Conduit (Maybe a) m a? - person Roman Cheplyaka; 06.10.2015
comment
Я не уверен, что будет означать эта точная сигнатура типа или как будет вести себя такая функция. Но одним из мотивирующих факторов для этого дизайна является то, что комбинатор (folded) имеет возможность принудительно очищать остальную часть подпотока в случаях, когда предоставленная функция (perGroup) не полностью использует свой ввод. - person Michael Snoyman; 06.10.2015