РЕДАКТИРОВАТЬ: Оказалось, что я был очень неправ. TransformBlock
возвращает элементы в том же порядке, в котором они пришли, даже если он настроен для параллелизма. Из-за этого код в моем исходном ответе совершенно бесполезен, и вместо него можно использовать обычный TransformBlock
.
Исходный ответ:
Насколько мне известно, только одна конструкция параллелизма в .Net поддерживает возврат обработанных элементов в том порядке, в котором они пришли: PLINQ с _ 3_. Но мне кажется, что PLINQ не подходит для того, что вы хотите.
С другой стороны, TPL Dataflow подходит, я думаю, хорошо, но в нем нет блока, который поддерживал бы параллелизм и одновременный возврат элементов по порядку (TransformBlock
поддерживает оба из них, но не одновременно). К счастью, блоки Dataflow были разработаны с учетом возможности компоновки, поэтому мы можем создать собственный блок, который это сделает.
Но сначала мы должны выяснить, как упорядочить результаты. Использование параллельного словаря, как вы предложили, вместе с некоторым механизмом синхронизации, безусловно, сработает. Но я думаю, что есть более простое решение: использовать очередь из Task
s. В задаче вывода вы удаляете Task
из очереди, ждете его завершения (асинхронно), и когда это происходит, вы отправляете его результат. Нам все еще нужна некоторая синхронизация для случая, когда очередь пуста, но мы можем получить ее бесплатно, если выберем, какую очередь использовать с умом.
Итак, общая идея такова: то, что мы пишем, будет IPropagatorBlock
, с некоторым вводом и некоторым выводом. Самый простой способ создать собственный IPropagatorBlock
- создать один блок, обрабатывающий ввод, другой блок, который выдает результаты, и обрабатывать их как единое целое, используя _ 9_.
Входной блок должен будет обрабатывать входящие элементы в правильном порядке, поэтому распараллеливания здесь нет. Будет создан новый Task
(фактически, TaskCompletionSource
, чтобы мы может установить результат Task
позже), добавить его в очередь и затем отправить элемент на обработку вместе с некоторым способом установить результат правильного Task
. Поскольку нам не нужно связывать этот блок ни с чем, мы можем использовать ActionBlock
.
Блок вывода должен будет взять Task
из очереди, асинхронно дождаться их, а затем отправить их вместе. Но поскольку все блоки имеют встроенную очередь, а блоки, принимающие делегатов, имеют встроенное асинхронное ожидание, это будет очень просто: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Этот блок будет работать и как очередь и как выходной блок. Благодаря этому нам не приходится заниматься какой-либо синхронизацией.
Последний кусок головоломки - это параллельная обработка предметов. Для этого мы можем использовать другой ActionBlock
, на этот раз с установленным MaxDegreeOfParallelism
. Он примет ввод, обработает его и установит результат правильного Task
в очереди.
В совокупности это могло бы выглядеть так:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
Я думаю, после стольких разговоров это довольно небольшой объем кода.
Похоже, вы очень заботитесь о производительности, поэтому вам может потребоваться точная настройка этого кода. Например, имеет смысл установить MaxDegreeOfParallelism
блока processor
на что-то вроде Environment.ProcessorCount
, чтобы избежать переподписки. Кроме того, если для вас задержка важнее, чем пропускная способность, может иметь смысл установить MaxMessagesPerTask
того же блока на 1 (или другое небольшое число), чтобы по завершении обработки элемента он немедленно отправлялся на выход.
Кроме того, если вы хотите ограничить входящие элементы, вы можете установить BoundedCapacity
из enqueuer
.
person
svick
schedule
15.06.2012