BufferBlock и ActionBlock с BoundedCapacity не используют максимальный DOP

У меня есть этот код:

var data = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 1 });

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);

    await Task.Delay(1000);

    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = -1
});

data.LinkTo(action, new DataflowLinkOptions { PropagateCompletion = true });

for (var id = 1; id <= 3; id++)
{
    Console.WriteLine("[{0:T}] Sending {1}", DateTime.Now, id);
    data.SendAsync(id).Wait();
    Console.WriteLine("[{0:T}] Sending {1} complete", DateTime.Now, id);
}

data.Complete();

Task.WhenAll(data.Completion, action.Completion).Wait();

И этот код дает мне этот вывод:

[22:31:22] Sending 1
[22:31:22] Sending 1 complete
[22:31:22] Sending 2
[22:31:22] #1: Start
[22:31:22] Sending 2 complete
[22:31:22] Sending 3
[22:31:23] #1: End
[22:31:23] #2: Start
[22:31:23] Sending 3 complete
[22:31:24] #2: End
[22:31:24] #3: Start
[22:31:25] #3: End

Почему ActionBlock не работает параллельно, несмотря на неограниченный DOP?


person Michael Logutov    schedule 26.10.2014    source источник


Ответы (1)


Причина, по которой ваш ActionBlock имеет ограниченную степень параллелизма, заключается в том, что он имеет BoundedCapacity, равное 1. BoundedCapacity (в отличие от InputCount) включает элемент, обрабатываемый в данный момент. Это можно легко продемонстрировать:

var block = new ActionBlock<int>(_ => Task.Delay(-1), new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 1,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

await block.SendAsync(4); // Adds a new item
await block.SendAsync(4); // Blocks forever

Это означает, что пока вы устанавливаете MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, блок не может одновременно принимать более одного элемента, что практически ограничивает вашу степень параллелизма.

Вы можете исправить это, установив большее значение BoundedCapacity:

var action = new ActionBlock<int>(async id =>
{
    Console.WriteLine("[{0:T}] #{1}: Start", DateTime.Now, id);
    await Task.Delay(1000);
    Console.WriteLine("[{0:T}] #{1}: End", DateTime.Now, id);
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10,
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
person i3arnon    schedule 27.10.2014
comment
Хорошо, но я также установил DOP на -1. И я подумал, что Dataflow должен создавать столько копий ActionBlock, сколько ему нужно, а это означает, что даже если один экземпляр ActionBlock получил один элемент, а есть другой входящий элемент, он может создать другой экземпляр ActionBlock для обработки нового элемента. И это происходит так. Нет? - person Michael Logutov; 28.10.2014
comment
@MichaelLogutov блок не копирует сам себя. Если вы не создадите больше, будет только один экземпляр. Блоки используют задачи для параллелизма, но если вы ограничиваете его емкость, он не может содержать достаточно элементов для их распараллеливания. - person i3arnon; 28.10.2014
comment
Спасибо. Не знал этого. И документации действительно не хватает этого момента. - person Michael Logutov; 28.10.2014