Я написал небольшой конвейер, используя API потока данных TPL, который получает данные из нескольких потоков и выполняет их обработку.
Настройка 1
Когда я настраиваю его для использования MaxDegreeOfParallelism = Environment.ProcessorCount
(в моем случае это 8
) для каждого блока, я замечаю, что он заполняет буферы в нескольких потоках, и обработка второго блока не начинается до тех пор, пока не будет получено +- 1700 элементов во всех потоках. Вы можете увидеть это в действии здесь.
Настройка 2
Когда я устанавливаю MaxDegreeOfParallelism = 1
, я замечаю, что все элементы принимаются в одном потоке, и обработка отправки уже начинается после получения +- 40 элементов. Данные здесь.
Настройка 3
Когда я устанавливаю MaxDegreeOfParallelism = 1
и ввожу задержку в 1000 мс перед отправкой каждого ввода, я замечаю, что элементы отправляются сразу после их получения, и каждый полученный элемент помещается в отдельный поток. Данные здесь.
Пока установка. Мои вопросы следующие:
Когда я сравниваю настройки 1 и 2, я замечаю, что обработка элементов начинается намного быстрее, когда они выполняются последовательно, по сравнению с параллельным (даже с учетом того факта, что параллельный поток имеет в 8 раз больше потоков). Что вызывает эту разницу?
Поскольку это будет выполняться в среде ASP.NET, я не хочу создавать ненужные потоки, поскольку все они исходят из одного пула потоков. Как показано в настройке 3, он по-прежнему будет распределяться по нескольким потокам, даже если данных всего несколько. Это также удивительно, поскольку в настройке 1 я предполагал, что данные распределяются последовательно по потокам (обратите внимание, как все первые 50 элементов переходят в поток 16). Могу ли я убедиться, что он создает новые потоки только по требованию?
Существует еще одна концепция, называемая
BufferBlock<T>
. ЕслиTransformBlock<T>
уже ставит входные данные в очередь, какая практическая разница в замене первого шага в моем конвейере (ReceiveElement
) наBufferBlock
?
class Program
{
static void Main(string[] args)
{
var dataflowProcessor = new DataflowProcessor<string>();
var amountOfTasks = 5;
var tasks = new Task[amountOfTasks];
for (var i = 0; i < amountOfTasks; i++)
{
tasks[i] = SpawnThread(dataflowProcessor, $"Task {i + 1}");
}
foreach (var task in tasks)
{
task.Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Finished feeding threads"); // Needs to use async main
Console.Read();
}
private static Task SpawnThread(DataflowProcessor<string> dataflowProcessor, string taskName)
{
return new Task(async () =>
{
await FeedData(dataflowProcessor, taskName);
});
}
private static async Task FeedData(DataflowProcessor<string> dataflowProcessor, string threadName)
{
foreach (var i in Enumerable.Range(0, short.MaxValue))
{
await Task.Delay(1000); // Only used for the delayedSerialProcessing test
dataflowProcessor.Process($"Thread name: {threadName}\t Thread ID:{Thread.CurrentThread.ManagedThreadId}\t Value:{i}");
}
}
}
public class DataflowProcessor<T>
{
private static readonly ExecutionDataflowBlockOptions ExecutionOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
};
private static readonly TransformBlock<T, T> ReceiveElement = new TransformBlock<T, T>(element =>
{
Console.WriteLine($"Processing received element in thread {Thread.CurrentThread.ManagedThreadId}");
return element;
}, ExecutionOptions);
private static readonly ActionBlock<T> SendElement = new ActionBlock<T>(element =>
{
Console.WriteLine($"Processing sent element in thread {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine(element);
}, ExecutionOptions);
static DataflowProcessor()
{
ReceiveElement.LinkTo(SendElement);
ReceiveElement.Completion.ContinueWith(x =>
{
if (x.IsFaulted)
{
((IDataflowBlock) ReceiveElement).Fault(x.Exception);
}
else
{
ReceiveElement.Complete();
}
});
}
public void Process(T newElement)
{
ReceiveElement.Post(newElement);
}
}
TaskScheduler
. - person i3arnon   schedule 15.06.2016MaxDegreeParallelism
), при автоматической постановке в очередь других входящих данных. Я понял это неправильно? - person Jeroen Vannevel   schedule 17.06.2016