Microsoft TPL Dataflow — синхронная обработка коррелирующих запросов

Заранее извиняюсь за название, но это лучшее, что я мог придумать, чтобы описать действие.

Требуется обработка запросов шины сообщений. Входящие запросы могут быть связаны с идентификатором, который коррелирует или группирует эти запросы. Я хочу, чтобы поток запросов синхронно обрабатывал коррелирующие идентификаторы. Однако разные идентификаторы могут обрабатываться асинхронно.

Я использую concurrentdictionary для отслеживания обрабатываемого запроса и предиката в ссылке.

Предполагается, что это обеспечит синхронную обработку связанных запросов.

Однако поведение, которое я получаю, заключается в том, что первый запрос обрабатывается, а второй запрос отбрасывается.

Я прикрепил пример кода из консольного приложения для имитации проблемы.

Любое направление или обратная связь будут оценены.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var requestTracker = new ConcurrentDictionary<string, string>();

            var bufferBlock = new BufferBlock<Request>();

            var actionBlock = new ActionBlock<Request>(x => 
            {
                Console.WriteLine("processing item {0}",x.Name);
                Thread.Sleep(5000);
                string itemOut = null;
                requestTracker.TryRemove(x.Id, out itemOut);
            });

            bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));


            var publisher = Task.Run(() =>
            {
                var request = new Request("item_1", "first item");
                bufferBlock.SendAsync(request);

                var request_1 = new Request("item_1", "second item");
                bufferBlock.SendAsync(request_1);

            });

            publisher.Wait();
            Console.ReadLine();
        }
    }

    public class Request
    {
        public Request(string id, string name)
        {
            this.Id = id;
            this.Name = name;
        }
        public string Id { get; set; }
        public string Name { get; set; }
    }
}

person rizan    schedule 07.07.2014    source источник
comment
Ваши исключения должны распространяться по конвейеру вашего потока данных, чтобы вы могли видеть, что пошло не так. Взгляните на полный пример MS в конце этого Прохождение MSDN. Затем вы можете обработать AggregateException, чтобы выяснить, что пошло не так.   -  person JNYRanger    schedule 08.07.2014
comment
Вы имеете в виду, что хотите, чтобы группа с одним и тем же идентификатором обрабатывалась одна за другой, в то время как группы могли обрабатываться одновременно? Если да, то есть ваш ответ: stackoverflow.com/q/21010024/885318   -  person i3arnon    schedule 08.07.2014
comment
@ I3arnon - ваше решение, кажется, то, что я ищу. Я немного ленив здесь, но, возможно, у вас есть более подробная информация. Я предполагаю, что ключи, которые вы получаете, являются динамическими, т.е. в основном пакеты сообщений будут иметь один и тот же ключ, и он продолжает меняться. Когда блок действий делегирован для обработки сообщения, он обновляет словарь, говоря, что я занят этим запросом, и любой последующий запрос, соответствующий этому ключу, делегируется этому блоку действий? Я прав?   -  person rizan    schedule 08.07.2014
comment
@ I3arnon Я думаю, что это излишне сложно.   -  person svick    schedule 08.07.2014
comment
@svick, что бы ты предложил?   -  person rizan    schedule 09.07.2014
comment
@ I3arnon Я следовал вашей методологии с несколькими настройками, и, похоже, она работает хорошо - опубликую образец.   -  person rizan    schedule 09.07.2014
comment
@rizan На самом деле ключи могут быть любыми, но в моем случае это был идентификатор сеанса TCP, поэтому у меня есть x блоков действий, работающих параллельно, чтобы использовать ЦП, и я слежу за тем, чтобы элементы с одинаковым идентификатором попадали в один и тот же блок действий. Если вы столкнулись с немного другой проблемой, скажите, как, и мы разберемся.   -  person i3arnon    schedule 09.07.2014
comment
@svick, какую функцию вы бы удалили?   -  person i3arnon    schedule 09.07.2014
comment
@ I3arnon Хеширование. Вы можете просто иметь один блок для каждой группы (при условии, что тот факт, что это в значительной степени утечка памяти, не имеет значения).   -  person svick    schedule 09.07.2014
comment
@svick, если предположить, что группы не определены заранее, я бы не хотел динамически создавать блок для каждой группы, это дорого и может снизить производительность. Это было бы похоже на установку MaxDegreeOfParallelism на бесконечность.   -  person i3arnon    schedule 09.07.2014


Ответы (2)


  1. Вы говорите, что хотите обрабатывать некоторые запросы параллельно (по крайней мере, я предполагаю, что вы имели в виду «асинхронно»), но ActionBlock по умолчанию не является параллельным. Чтобы изменить это, установите MaxDegreeOfParallelism .

  2. Вы пытаетесь использовать TryAdd() в качестве фильтра, но это не сработает по двум причинам:

    1. The filter is invoked only once, it's not automatically retried or anything like that. That means that if an item doesn't go through, it would never go through, even after the item that was blocking it was completed.
    2. Если элемент застрял в очереди вывода блока, никакие другие элементы не выберутся из этого блока. Это может значительно снизить уровень параллелизма, даже если вы каким-то образом обошли предыдущую проблему.
  3. Я думаю, что самым простым решением здесь было бы иметь блок для каждой группы, таким образом элементы из каждой группы будут обрабатываться последовательно, а элементы из разных групп будут обрабатываться параллельно. В коде это может выглядеть примерно так:

    var processingBlocks = new Dictionary<string, ActionBlock<Request>>();
    
    var splitterBlock = new ActionBlock<Request>(request =>
    {
        ActionBlock<Request> processingBlock;
    
        if (!processingBlocks.TryGetValue(request.Id, out processingBlock))
        {
            processingBlock = processingBlocks[request.Id] =
                new ActionBlock<Request>(r => /* process the request here */);
        }
    
        processingBlock.Post(request);
    });
    

    Проблема с этим подходом заключается в том, что блоки обработки для групп никогда не исчезают. Если вы не можете себе это позволить (это утечка памяти), потому что у вас будет большое количество групп, тогда хеширование подход, предложенный И3арноном, является правильным.

person svick    schedule 14.07.2014
comment
Спасибо за помощь, svick - на самом деле я использовал подход I3arnon. Гораздо более надежный и менее подверженный ошибкам для того, что я пытался сделать. - person rizan; 17.07.2014

Я полагаю, это потому, что ваш LinkTo() настроен неправильно. Имея LinkTo() и передавая функцию в качестве аргумента, вы добавляете условие. Итак, эта строка:

bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id, x.Name));

По сути, речь идет о передаче данных из bufferBlock в actionBlock, ЕСЛИ вы можете добавить в свой параллельный словарь, что не обязательно имеет смысл (по крайней мере, в вашем примере кода)

Вместо этого вы должны связать свой bufferBlock с блоком действий без лямбды, поскольку в этой ситуации вам не нужна условная ссылка (по крайней мере, я так не думаю, основываясь на вашем примере кода).

Кроме того, взгляните на этот вопрос SO, чтобы узнать, следует ли вам использовать SendAsync() или Post(), поскольку Post() проще обрабатывать для простого добавления данных в конвейер: TPL Dataflow, в чем функциональное различие между Post() и SendAsync()? . SendAsync вернет задачу, а Post вернет значение true/false в зависимости от успешного входа в конвейер.

Таким образом, чтобы выяснить, что происходит не так, вам нужно обработать продолжения ваших блоков. В MSDN есть хороший учебник по их введению в TPL Dataflow: Создайте конвейер потока данных По сути это будет выглядеть так:

//link to section
bufferBlock.LinkTo(actionBlock);
//continuations
bufferBlock.Completion.ContinueWith(t =>
{
     if(t.IsFaulted)  ((IDataFlowBlock).actionBlock).Fault(t.Exception); //send the exception down the pipeline
     else actionBlock.Complete(); //tell the next block that we're done with the bufferblock
 });

Затем вы можете поймать исключение (AggregateException) при ожидании конвейера. Вам действительно нужно использовать concurrentdictionary в вашем фактическом коде для отслеживания, потому что это может вызвать проблему, когда он не может добавить, поскольку, когда предикат linkto возвращает false, он не передает данные в следующий блок конвейера .

person JNYRanger    schedule 07.07.2014
comment
Спасибо JNYRanger, я попробую. Целью TryAdd по сравнению с TryGet была безопасность потоков. Другой заключался в том, что concurrentdictionary оптимизирован для чтения. В моем случае 2 коррелирующих идентификатора прошли проверку, и оба пытаются добавить. Поэтому я выбрал TryAdd, в этом случае один из них выиграет условие гонки, а другой будет отложен. - person rizan; 08.07.2014
comment
@rizan Имеет смысл, но для того, чтобы все прошло, вам понадобится второй LinkTo() для обработки сбоев, добавляемых в ConcurrentDictionary, иначе ваш конвейер сломается, что вы и испытываете. - person JNYRanger; 08.07.2014
comment
ваша методология и образцы, которые вы предоставили, отлично работают в условиях, когда можно ожидать завершения. Что я сделал, так это использовал методологию И3арнона с некоторыми небольшими изменениями. Сначала я использовал TDD для написания тестов, поэтому я охватываю все основы. Большое спасибо за помощь. - person rizan; 09.07.2014
comment
@rizan Рад, что смог помочь и хотя бы указать вам правильное направление. Я рекомендую вам ответить на свой вопрос, чтобы показать другим, как вы решили свою проблему. - person JNYRanger; 09.07.2014