Multi-Producer Multi-Consumer синхронизация данных с отдельными очередями

У меня есть следующий сценарий:

  1. переменное количество (более трех) очередей (зависит от конфигурации, установленной в файле)
  2. некоторые из этих очередей могут быть либо заполнены данными, либо нет (это зависит от производителя, который получает данные через сетевой клиент: клиент может быть подключен или нет в течение одной и той же сессии)
  3. Эти очереди заполняются с разной скоростью; так, например, Queue1 может иметь 10 объектов одновременно, тогда как другая очередь Queue2 может иметь только 3 объекта одновременно.
  4. объекты в этих очередях должны быть синхронизированы в соответствии со свойством, общим для всех из них (свойство int, постоянно увеличивающееся, называемое «SSId»)
  5. синхронизация должна происходить только для тех очередей, которые в данный момент заполняются данными (неподключенные очереди должны быть исключены)
  6. когда объекты синхронизируются, они должны быть помещены в соответствующую очередь вывода, используемую соответствующим потребителем: каждый производитель связан с конкретным потребителем.
  7. после предыдущего шага каждый потребитель может одновременно обрабатывать поставленный в очередь объект с тем же значением свойства для «SSId»;
  8. Таким образом, конечным результатом должна быть система, в которой потребители могут обрабатывать данные (синхронизированные в соответствии с уже упомянутым свойством «SSId») с одинаковой скоростью, даже если каждый производитель генерирует их с разными скоростями/скоростями.

Чтобы дать более четкое представление, есть схема, представляющая поток, описанный в предыдущих пунктах: сетка потока данных

Обратите внимание, что новые элементы с SSid больше 100 не помещаются в очереди потребителей, поскольку в других очередях еще нет соответствующих элементов.

Не могли бы вы предложить подход для создания такого рода синхронизации с использованием .NET TPL Dataflow или Rx.NET? До сих пор я использовал TPL Dataflow для реализации простых последовательных конвейеров и хотел бы получить отзыв о том, как поступить с этим сценарием. Заранее спасибо за любое предложение.


comment
Что запускает операцию синхронизации? Как обнаружить неподключенного производителя?   -  person Shlomo    schedule 05.08.2019
comment
Операция синхронизации должна выполняться с момента, когда производители начинают заполнять очереди: всякий раз, когда все подключенные очереди заполняются элементом с одинаковым SSId, эти элементы должны быть отправлены в очереди потребителей. Неподключенный производитель обнаруживается через событие, возникающее каждый раз, когда производитель отключается (или снова подключается).   -  person Alex    schedule 05.08.2019
comment
Это учитывая, что идентификаторы всегда увеличиваются?   -  person Theodor Zoulias    schedule 14.08.2019
comment
да, в нашем случае идентификаторы определены как длинные и постоянно увеличиваются на единицу: 1, 2, 3, ...   -  person Alex    schedule 29.10.2019


Ответы (1)


Как насчет

  1. Слияние объектов от всех производителей в один наблюдаемый
  2. Группировка объектов по SSId
  3. Выпустить группу, когда размер группы равен количеству производителей (с помощью .Buffer())

Как это:

var syncedProducers = 
    // ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
    ConnectedProducersEvent
        .SelectMany(producers => 
            Observable
                .Merge(producers) // Put all objects, from all producers into the same observable
                .GroupBy(@object => @object.SSId) // Group objects by matching SSId
            .SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers

// Now you can wire syncedProducers to consumers
var consumer1 = 
    syncedProducers
        .Select(x => x.Where(y => y.Producer == 1));

Вы можете запустить пример на dotnetfiddle

person Magnus    schedule 09.08.2019