У меня есть следующий сценарий:
- переменное количество (более трех) очередей (зависит от конфигурации, установленной в файле)
- некоторые из этих очередей могут быть либо заполнены данными, либо нет (это зависит от производителя, который получает данные через сетевой клиент: клиент может быть подключен или нет в течение одной и той же сессии)
- Эти очереди заполняются с разной скоростью; так, например, Queue1 может иметь 10 объектов одновременно, тогда как другая очередь Queue2 может иметь только 3 объекта одновременно.
- объекты в этих очередях должны быть синхронизированы в соответствии со свойством, общим для всех из них (свойство int, постоянно увеличивающееся, называемое «SSId»)
- синхронизация должна происходить только для тех очередей, которые в данный момент заполняются данными (неподключенные очереди должны быть исключены)
- когда объекты синхронизируются, они должны быть помещены в соответствующую очередь вывода, используемую соответствующим потребителем: каждый производитель связан с конкретным потребителем.
- после предыдущего шага каждый потребитель может одновременно обрабатывать поставленный в очередь объект с тем же значением свойства для «SSId»;
- Таким образом, конечным результатом должна быть система, в которой потребители могут обрабатывать данные (синхронизированные в соответствии с уже упомянутым свойством «SSId») с одинаковой скоростью, даже если каждый производитель генерирует их с разными скоростями/скоростями.
Чтобы дать более четкое представление, есть схема, представляющая поток, описанный в предыдущих пунктах:
Обратите внимание, что новые элементы с SSid больше 100 не помещаются в очереди потребителей, поскольку в других очередях еще нет соответствующих элементов.
Не могли бы вы предложить подход для создания такого рода синхронизации с использованием .NET TPL Dataflow или Rx.NET? До сих пор я использовал TPL Dataflow для реализации простых последовательных конвейеров и хотел бы получить отзыв о том, как поступить с этим сценарием. Заранее спасибо за любое предложение.