Я уже некоторое время борюсь с этой проблемой, хотя, честно говоря, я многое узнал о Conduit, так как раньше я в основном использовал готовые примеры за некоторыми исключениями.
Основная задача сформулирована так для трубопроводов A
, B
и C
; A .| B
(A
входит в B
) и A .| C
, и, наконец, мне нужна функция, которая принимает B и C и создает промежуточный Conduit, назовите ее Merge B C
, чтобы я мог сделать (Merge B C) .| D
. Мой опыт работы с языками, отличными от Haskell, с FRP/потоковыми библиотеками показывает, что существует несколько различных способов выполнить «слияние» (например, семейство zip-операций «sample on» — создавать новые элементы для D
только тогда, когда один или несколько выбранных входных данных каналы имеют новое значение и т. д.). Я думаю, моя проблема заключается в том, чтобы понять, как это сделать в Conduit, если он поддерживается.
Еще более конкретно для моей конкретной сегодняшней проблемы B
имеет отношение 1:1 к A
, тогда как C
имеет отношение многие:1 к A
, и, в конечном счете, в D
я хочу, чтобы повторяющиеся элементы B
объединялись с соответствующими элементами C
: если a~b
и a~c
для a
в A
, b
в B
и c
в C
, затем (b,c)
передаются в D
. Таким образом, я смог использовать ZipSink
и тот факт, что на самом деле это разумное место для стока (помимо производительности, на которую я не смотрел). Конечно, как и ожидалось, getZipSink
ничего не знает об отношениях один ко многим и о том, как с ними работать; он имеет широко определенное поведение zip, чтобы просто циклически проходить через входные потоки, пока все входные потоки не будут пройдены один раз.
Я предполагаю, что один из способов сделать это может заключаться в том, чтобы каким-то образом изменить мой поток «один ко многим» в поток «один к одному», сделав складку во что-то вроде списка. Но тогда мне пришлось бы распаковывать его позже вне контекста канала. На данный момент я просто хочу спросить, какие рекомендуемые способы.
Мой фактический код выглядит так (A
— это sourceDirectoryDeep
, B
— это processFileName
, C
— это processCSV
, а D
— это (вроде как) getZipSink
):
retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
.| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
& runConduitRes
print rows
rows & fmap fromRow & catMaybes & return
where
combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processCSV = mapMC (liftIO . DTIO.readFile)
.| intoCSV defCSVSettings
.| sinkVector
processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processFileName = mapC go
.| sinkVector
where
go :: FilePath -> MapRow Text
go fp = takeFileName fp
& takeWhile (/= '.')
& splitOn "_"
& fmap Txt.pack
& zip colNames
& DM.fromList
colNames = [markKey, idKey]
Импорт (некоторые из которых могут быть посторонними):
import Conduit
import qualified Data.Conduit.Combinators as DCC
import Data.CSV.Conduit
import Data.Function ((&))
import Data.List.Split (splitOn)
import Data.Map as DM
import Data.Text (Text)
import qualified Data.Text as Txt
import qualified Data.Text.IO as DTIO
import Data.Vector (Vector)
import Path
import System.FilePath.Posix