TransformBlock никогда не завершается

Пытаюсь осознать «доработку» в блоках TPL Dataflow. В частности, TransformBlock, похоже, никогда не завершается. Почему?

Пример программы

Мой код вычисляет квадрат всех целых чисел от 1 до 1000. Я использовал для этого BufferBlock и TransformBlock. Позже в своем коде я жду завершения TransformBlock. Однако блок никогда не завершается, и я не понимаю почему.

static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

    using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            bufferBlock.Post(number);
        }

        bufferBlock.Complete();

        // This line never completes
        calculatorBlock.Completion.Wait();

        // Unreachable code
        IList<int> results;
        if (calculatorBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}

Сначала я подумал, что создал тупиковую ситуацию, но это не похоже на правду. Когда я проверил задачу calculatorBlock.Completion в отладчике, ее свойство Status было установлено на WaitingForActivation. Это был момент, когда мой мозг показался синим.


person Steven Liekens    schedule 28.11.2014    source источник


Ответы (3)


Причина зависания вашего конвейера заключается в том, что и BufferBlock, и TransformBlock, очевидно, не завершаются, пока они не освободят себя от элементов (я предполагаю, что желаемое поведение IPropagatorBlocks, хотя я не нашел по нему документации).

Это можно проверить на более минимальном примере:

var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();

Это блокируется на неопределенный срок, если вы не добавите bufferBlock.Receive(); до завершения.

Если вы удалите элементы из конвейера перед блокировкой с помощью блока кода TryReceiveAll, подключения другого ActionBlock к конвейеру, преобразования вашего TransformBlock в ActionBlock или любым другим способом, блокировка больше не будет.


Что касается вашего конкретного решения, кажется, что вам вообще не нужны BufferBlock или TransformBlock, поскольку блоки имеют входную очередь для самих себя, и вы не используете возвращаемое значение TransformBlock. Этого можно добиться с помощью всего ActionBlock:

var block = new ActionBlock<int>(
    i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
    },
    new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
    block.Post(number);
}
block.Complete();
block.Completion.Wait();
person i3arnon    schedule 28.11.2014
comment
В документации было только следующее: После того, как Complete был вызван для блока потока данных, этот блок завершится, и его задача Completion перейдет в окончательное состояние после того, как обработает все ранее доступные данные. Я предположил что обработано, значит преобразовано. Никогда бы не догадался, что это означает преобразованный и полученный. - person Steven Liekens; 28.11.2014
comment
@StevenLiekens Я сам был очень удивлен. Я тестировал BufferBlock и смотрел исходный код с тех пор, как вы задали этот вопрос. - person i3arnon; 28.11.2014

Я думаю, что теперь понимаю это. Экземпляр TransformBlock не считается завершенным, пока не будут выполнены следующие условия:

  1. TransformBlock.Complete() был вызван
  2. InputCount == 0 - блок применил преобразование ко всем входящим элементам
  3. OutputCount == 0 - все преобразованные элементы покинули выходной буфер

В моей программе нет целевого блока, связанного с источником TransformBlock, поэтому исходный блок никогда не очищает свой выходной буфер.

В качестве обходного пути я добавил второй BufferBlock, который используется для хранения преобразованных элементов.

static void Main(string[] args)
{
    var inputBufferBlock = new BufferBlock<int>();
    var calculatorBlock = new TransformBlock<int, int>(i =>
    {
        Console.WriteLine("Calculating {0}²", i);
        return (int)Math.Pow(i, 2);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
    var outputBufferBlock = new BufferBlock<int>();
    using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
    {
        foreach (var number in Enumerable.Range(1, 1000))
        {
            inputBufferBlock.Post(number);
        }

        inputBufferBlock.Complete();
        calculatorBlock.Completion.Wait();

        IList<int> results;
        if (outputBufferBlock.TryReceiveAll(out results))
        {
            foreach (var result in results)
            {
                Console.WriteLine("x² = {0}", result);
            }
        }
    }
}
person Steven Liekens    schedule 28.11.2014

TransformBlock нужен ITargetBlock, где он может опубликовать преобразование.

 var writeCustomerBlock = new ActionBlock<int>(c => Console.WriteLine(c));
        transformBlock.LinkTo(
            writeCustomerBlock, new DataflowLinkOptions
            {
                PropagateCompletion = true
            });

После этого он завершится.

person Alex Cr    schedule 29.09.2020