Это работа для TPL Dataflow?

Я использую довольно типичную модель производитель / потребитель для разных задач.

Задача 1: считывает пакеты byte [] из двоичных файлов и запускает новую задачу для каждой коллекции массивов байтов. (операция запрограммирована в целях управления памятью).

Задача 2-n: это рабочие задачи, каждая из которых работает с переданной коллекцией (из Задачи 1) байтовых массивов и десериализует байтовые массивы, сортирует их по определенным критериям, а затем сохраняет коллекцию результирующих объектов (каждый байтовый массив десериализуется в такой объект) в параллельном словаре.

Задача (n + 1) Я выбрал параллельный словарь, потому что задача этой задачи состоит в том, чтобы объединить коллекции, которые хранятся в параллельном словаре, в том же порядке, в каком они возникли из Задачи 1. Я достигаю этого, передавая collectionID (он имеет тип int и увеличивается для каждой новой коллекции в Task1) на всем пути от Task1 к этой задаче. Эта задача в основном проверяет, сохранен ли уже следующий ожидаемый идентификатор коллекции в параллельном словаре, и, если да, извлекает его, добавляет в окончательную очередь и проверяет наличие следующей коллекции в параллельном словаре.

Теперь, судя по тому, что я прочитал, и просмотренным видео, мне кажется, что TPL Dataflow может быть идеальным кандидатом для такой модели производителя / потребителя. Кажется, я просто не могу разработать дизайн и, таким образом, начать работу, потому что я никогда не работал с TPL Dataflow. С точки зрения пропускной способности и задержки эта библиотека справится с поставленной задачей? В настоящее время я обрабатываю 2,5 миллиона байтовых массивов и, следовательно, объектов в секунду в результирующих коллекциях. Может ли TPL Dataflow упростить? Меня особенно интересует ответ на следующий вопрос: может ли TPL Dataflow сохранить порядок пакетов сбора из Task1 при порождении рабочих задач и их повторном слиянии после того, как рабочие задачи выполнили свою работу? Оптимизирует ли это? Профилировав всю структуру, я чувствую, что довольно много времени потрачено впустую из-за вращения и слишком большого количества параллельных коллекций.

Есть идеи, мысли?


person Matt    schedule 15.06.2012    source источник


Ответы (2)


РЕДАКТИРОВАТЬ: Оказалось, что я был очень неправ. TransformBlock возвращает элементы в том же порядке, в котором они пришли, даже если он настроен для параллелизма. Из-за этого код в моем исходном ответе совершенно бесполезен, и вместо него можно использовать обычный TransformBlock.


Исходный ответ:

Насколько мне известно, только одна конструкция параллелизма в .Net поддерживает возврат обработанных элементов в том порядке, в котором они пришли: PLINQ с _ 3_. Но мне кажется, что PLINQ не подходит для того, что вы хотите.

С другой стороны, TPL Dataflow подходит, я думаю, хорошо, но в нем нет блока, который поддерживал бы параллелизм и одновременный возврат элементов по порядку (TransformBlock поддерживает оба из них, но не одновременно). К счастью, блоки Dataflow были разработаны с учетом возможности компоновки, поэтому мы можем создать собственный блок, который это сделает.

Но сначала мы должны выяснить, как упорядочить результаты. Использование параллельного словаря, как вы предложили, вместе с некоторым механизмом синхронизации, безусловно, сработает. Но я думаю, что есть более простое решение: использовать очередь из Tasks. В задаче вывода вы удаляете Task из очереди, ждете его завершения (асинхронно), и когда это происходит, вы отправляете его результат. Нам все еще нужна некоторая синхронизация для случая, когда очередь пуста, но мы можем получить ее бесплатно, если выберем, какую очередь использовать с умом.

Итак, общая идея такова: то, что мы пишем, будет IPropagatorBlock, с некоторым вводом и некоторым выводом. Самый простой способ создать собственный IPropagatorBlock - создать один блок, обрабатывающий ввод, другой блок, который выдает результаты, и обрабатывать их как единое целое, используя _ 9_.

Входной блок должен будет обрабатывать входящие элементы в правильном порядке, поэтому распараллеливания здесь нет. Будет создан новый Task (фактически, TaskCompletionSource, чтобы мы может установить результат Task позже), добавить его в очередь и затем отправить элемент на обработку вместе с некоторым способом установить результат правильного Task. Поскольку нам не нужно связывать этот блок ни с чем, мы можем использовать ActionBlock.

Блок вывода должен будет взять Task из очереди, асинхронно дождаться их, а затем отправить их вместе. Но поскольку все блоки имеют встроенную очередь, а блоки, принимающие делегатов, имеют встроенное асинхронное ожидание, это будет очень просто: new TransformBlock<Task<TOutput>, TOutput>(t => t). Этот блок будет работать и как очередь и как выходной блок. Благодаря этому нам не приходится заниматься какой-либо синхронизацией.

Последний кусок головоломки - это параллельная обработка предметов. Для этого мы можем использовать другой ActionBlock, на этот раз с установленным MaxDegreeOfParallelism. Он примет ввод, обработает его и установит результат правильного Task в очереди.

В совокупности это могло бы выглядеть так:

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

Я думаю, после стольких разговоров это довольно небольшой объем кода.

Похоже, вы очень заботитесь о производительности, поэтому вам может потребоваться точная настройка этого кода. Например, имеет смысл установить MaxDegreeOfParallelism блока processor на что-то вроде Environment.ProcessorCount, чтобы избежать переподписки. Кроме того, если для вас задержка важнее, чем пропускная способность, может иметь смысл установить MaxMessagesPerTask того же блока на 1 (или другое небольшое число), чтобы по завершении обработки элемента он немедленно отправлялся на выход.

Кроме того, если вы хотите ограничить входящие элементы, вы можете установить BoundedCapacity из enqueuer.

person svick    schedule 15.06.2012
comment
Ничего себе, куча вкусностей, которые я хотел бы сначала переварить и опробовать. Большое спасибо за это, это, по крайней мере, заслуживает одобрения ;-) Позвольте мне поиграть с этими идеями, и я вернусь. Создание очереди задач имеет большой смысл, и мне интересно, почему я не получил этого раньше. - person Matt; 15.06.2012
comment
хорошо, я потратил некоторое время на просмотр вашего сообщения и чтение TPL Dataflow, вот пара вопросов, чтобы полностью понять ваше предлагаемое решение: (1) почему вы предлагаете настраиваемые IPropagatorBlock и IDataflowBlock.Encapsulate () с учетом Transformblock ‹Tin, Tout› уже существует? (2) Я не понимаю, как вы на самом деле планируете соединить блоки. Сначала вы говорите о ActionBlocks, а затем о TransformBlocks. Из того, что я читал, не будет ли ActionBlock конечной точкой всей архитектуры? - person Matt; 20.06.2012
comment
1. Это объясняется во втором абзаце: TransformBlock не может обрабатывать элементы параллельно и одновременно возвращать их по порядку. Он может выполнить любой из них, но не оба. - person svick; 20.06.2012
comment
2. Это все в коде. На самом деле я здесь не использовал LinkTo(), а вместо этого дважды использовал SendAsync() в enqueuer. Я мог превратить enqueuer в TransformBlock и использовать один LinkTo() и один SendAsync(), но мне он понравился больше, потому что он «симметричный». И ActionBlock обычно является конечной точкой, но код внутри него может делать все, что вы хотите, включая отправку элементов в другие блоки, что я и делаю. - person svick; 20.06.2012
comment
И я говорю сначала о ActionBlock (enqueuer), затем о другом блоке, который является TransformBlock (queue), и, наконец, о еще одном блоке, который является ActionBlock (processor). - person svick; 20.06.2012
comment
возможно, это самый глупый из моих вопросов, но как реализовать приведенный выше код? Я предполагаю, что мне нужно реализовать весь интерфейс, чтобы действительно протестировать? Я хочу запустить его, потому что думаю, что могу лучше понять его, просматривая каждую строку кода в режиме отладки. - person Matt; 20.06.2012
comment
Нет, вам не нужно реализовывать какой-либо интерфейс. Просто поместите метод в свой код и вызовите его (передав функцию, которая будет выполняться параллельно), он вернет новый блок, который выполняет то, что вы просили. Чтобы проверить это, вы можете связать его с другим ActionBlock, а затем использовать Post() или SendAsync() для добавления элементов для обработки. - person svick; 20.06.2012
comment
Я отметил ваш ответ как желаемое решение, отлично. По общему признанию, потребовалось довольно много времени, чтобы полностью понять, поиграть с ним, реализовать и протестировать производительность, но я наконец-то там, и ваше решение TPL Dataflow значительно превосходит то, что я ранее реализовал с использованием параллельных коллекций и большого количества кода котельной пластины. Замечательно, это действительно мотивирует меня глубже копать в TPL Dataflow. - person Matt; 25.06.2012
comment
@ Фредди Отлично. Я рад, что смог помочь. - person svick; 25.06.2012

Да, TPL Dataflow Библиотека хорошо подходит для этой работы. Он поддерживает все необходимые вам функции: MaxDegreeOfParallelism, BoundedCapacity и _3 _ . Но использование опции BoundedCapacity требует внимания к деталям.

Сначала вы должны убедиться, что вы загрузили первый блок в конвейере с _ 5_. В противном случае, если вы используете Post < / a> и игнорируйте его возвращаемое значение, вы можете потерять сообщения. SendAsync никогда не потеряет сообщения, потому что он асинхронно блокирует вызывающего до тех пор, пока не освободится место для входящего сообщения во внутреннем буфере блока.

Во-вторых, вы должны убедиться, что возможное исключение в блоке ниже по потоку не заблокирует на неопределенное время фидер, ожидая свободного места, которое никогда не появится. Нет встроенного способа сделать это автоматически, настроив блоки. Вместо этого вы должны вручную распространить завершение нижележащих блоков на вышестоящие блоки. Это цель метода PropagateFailure в примере ниже:

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}
person Theodor Zoulias    schedule 07.06.2020