Настройка ActionBlock‹T›

Я хочу реализовать приоритет ActionBlock<T>. Чтобы я мог условно дать приоритет некоторым TInput элементам, используя Predicate<T>.
Я прочитал Дополнительные примеры параллельных расширений и Руководство по реализации пользовательских блоков потока данных TPL.
Но все еще не понимаю, как реализовать этот сценарий.
----------- ------------------ РЕДАКТИРОВАТЬ -----------------------------------------
Там несколько задач, 5 из которых могут выполняться одновременно. Когда пользователь нажимает кнопку, некоторые (зависит от функции предиката) задачи должны выполняться с наибольшим приоритетом.
На самом деле я пишу этот код

TaskScheduler taskSchedulerHighPriority;
ActionBlock<CustomObject> actionBlockLow;
ActionBlock<CustomObject> actionBlockHigh;
...
queuedTaskScheduler = new QueuedTaskScheduler(TaskScheduler.Default, 5);
taskSchedulerHigh = queuedTaskScheduler.ActivateNewQueue(0);
taskSchedulerLow = queuedTaskScheduler.ActivateNewQueue(1);
...
actionBlockHigh = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, SingleProducerConstrained = false, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<CustomObject>(new Action<CustomObject>(method), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, MaxMessagesPerTask = 1, TaskScheduler = taskSchedulerLow });
...     
if (predicate(customObject))
    actionBlockHigh.Post(customObject);
else
    actionBlockLow.Post(customObject);

Но кажется, что приоритет вообще не действует.
---------------------------- РЕДАКТИРОВАТЬ ------ ------------
Я нахожу тот факт, что когда я использую эту строку кода:

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { TaskScheduler = taskSchedulerLow });

Причина, по которой приложение правильно соблюдает приоритеты задач, но одновременно может выполняться только одна задача, в то время как использование первого блока кода, показанного в потоке, приводит к тому, что приложение запускает 5 задач одновременно, но с неподходящим порядком приоритетов.

actionBlockHigh = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerHigh });
actionBlockLow = new ActionBlock<AvlHistory>(new Action<AvlHistory>(SemaphoreAction), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, TaskScheduler = taskSchedulerLow });

Обновление:
Танки на свик, я должен указать MaxMessagesPerTask для taskSchedulerLow.


person Rzassar    schedule 12.12.2012    source источник
comment
Что диктует приоритет? Это что-то вообще не связанное с T? Или приоритет является неотъемлемым/производным свойством T?   -  person casperOne    schedule 12.12.2012
comment
Вы можете создать пользовательский блок буфера, который использует ConcurrentPriorityQueue, или вы можете создать пользовательский блок асинхронного преобразования. Оба варианта нетривиальны. Также согласен с @casperOne, что в вашем случае означает приоритет?   -  person Panagiotis Kanavos    schedule 14.12.2012


Ответы (1)


В вашем вопросе не так много деталей, поэтому ниже приведено лишь предположение о том, что вам может понадобиться.

Я думаю, что самый простой способ сделать это — иметь два ActionBlock с разными приоритетами на QueuedTaskScheduler из ParallelExtensionsExtras. Вы бы связались с высокоприоритетным, используя предикат, а затем с низкоприоритетным. Кроме того, чтобы убедиться, что высокоприоритетные Task не ждут, установите MaxMessagesPerTask низкоприоритетного блока.

В коде это будет выглядеть примерно так:

static ITargetBlock<T> CreatePrioritizedActionBlock<T>(
    Action<T> action, Predicate<T> isPrioritizedPredicate)
{
    var buffer = new BufferBlock<T>();

    var scheduler = new QueuedTaskScheduler(1);

    var highPriorityScheduler = scheduler.ActivateNewQueue(0);
    var lowPriorityScheduler = scheduler.ActivateNewQueue(1);

    var highPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = highPriorityScheduler
        });
    var lowPriorityBlock = new ActionBlock<T>(
        action, new ExecutionDataflowBlockOptions
        {
            TaskScheduler = lowPriorityScheduler,
            MaxMessagesPerTask = 1
        });

    buffer.LinkTo(highPriorityBlock, isPrioritizedPredicate);
    buffer.LinkTo(lowPriorityBlock);

    return buffer;
}

Это всего лишь набросок того, что вы можете сделать, например, Completion возвращаемого блока ведет себя неправильно.

person svick    schedule 14.12.2012
comment
В вашем коде вы не указываете MaxMessagesPerTask для своего низкоприоритетного блока. Как я уже сказал, делать это очень важно. - person svick; 15.12.2012