Как объединить отношения ввода-вывода «один-к-одному» и «один-ко-многим» в канале?

Я уже некоторое время борюсь с этой проблемой, хотя, честно говоря, я многое узнал о Conduit, так как раньше я в основном использовал готовые примеры за некоторыми исключениями.

Основная задача сформулирована так для трубопроводов A, B и C; A .| B (A входит в B) и A .| C, и, наконец, мне нужна функция, которая принимает B и C и создает промежуточный Conduit, назовите ее Merge B C, чтобы я мог сделать (Merge B C) .| D. Мой опыт работы с языками, отличными от Haskell, с FRP/потоковыми библиотеками показывает, что существует несколько различных способов выполнить «слияние» (например, семейство zip-операций «sample on» — создавать новые элементы для D только тогда, когда один или несколько выбранных входных данных каналы имеют новое значение и т. д.). Я думаю, моя проблема заключается в том, чтобы понять, как это сделать в Conduit, если он поддерживается.

Еще более конкретно для моей конкретной сегодняшней проблемы B имеет отношение 1:1 к A, тогда как C имеет отношение многие:1 к A, и, в конечном счете, в D я хочу, чтобы повторяющиеся элементы B объединялись с соответствующими элементами C: если a~b и a~c для a в A, b в B и c в C, затем (b,c) передаются в D. Таким образом, я смог использовать ZipSink и тот факт, что на самом деле это разумное место для стока (помимо производительности, на которую я не смотрел). Конечно, как и ожидалось, getZipSink ничего не знает об отношениях один ко многим и о том, как с ними работать; он имеет широко определенное поведение zip, чтобы просто циклически проходить через входные потоки, пока все входные потоки не будут пройдены один раз.

Я предполагаю, что один из способов сделать это может заключаться в том, чтобы каким-то образом изменить мой поток «один ко многим» в поток «один к одному», сделав складку во что-то вроде списка. Но тогда мне пришлось бы распаковывать его позже вне контекста канала. На данный момент я просто хочу спросить, какие рекомендуемые способы.

Мой фактический код выглядит так (A — это sourceDirectoryDeep, B — это processFileName, C — это processCSV, а D — это (вроде как) getZipSink):

retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
  rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
    .| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
    & runConduitRes
  print rows
  rows & fmap fromRow & catMaybes & return
  where
    combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
    combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
    processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processCSV = mapMC (liftIO . DTIO.readFile)
      .| intoCSV defCSVSettings
      .| sinkVector
    processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
      ConduitT FilePath Void m (Vector ((MapRow Text)))
    processFileName = mapC go
      .| sinkVector
      where
        go :: FilePath -> MapRow Text
        go fp = takeFileName fp
          & takeWhile (/= '.')
          & splitOn "_"
          & fmap Txt.pack
          & zip colNames
          & DM.fromList
        colNames = [markKey, idKey]

Импорт (некоторые из которых могут быть посторонними):

import           Conduit
import qualified Data.Conduit.Combinators       as DCC
import           Data.CSV.Conduit
import           Data.Function                  ((&))
import           Data.List.Split                (splitOn)
import           Data.Map                       as DM
import           Data.Text                      (Text)
import qualified Data.Text                      as Txt
import qualified Data.Text.IO                   as DTIO
import           Data.Vector                    (Vector)
import           Path
import           System.FilePath.Posix

person bbarker    schedule 24.01.2019    source источник
comment
hackage.haskell. org/package/stm-conduit-4.0.1/docs/ выглядит потенциально полезным, но 1) мне нужно больше узнать о STM/каналах, 2) он устарел, и я не вижу, где написано, почему ( обновление: видимо, поэтому: github.com/cgaebel/ stm-conduit/issues/49), и 3) я до сих пор не вижу, где можно определить поведение того, как происходит слияние, но, возможно, реализация будет хорошее место для поиска экспериментов, но на данный момент у меня нет времени для создания собственного API канала STM.   -  person bbarker    schedule 24.01.2019
comment
Кажется, Conduit в основном предназначен для неразветвляющихся (линейных) конвейеров, поэтому в настоящее время я пытаюсь разделить вещи с этой целью ... все еще WIP.   -  person bbarker    schedule 24.01.2019
comment
Я нашел обходной путь: stackoverflow.com/a/54369926/3096687 Не очень похоже на поток, не уверен, что это хорошо похожее на трубопровод решение, но оно работает.   -  person bbarker    schedule 25.01.2019