Как остановить обработку конвейера на неисправном блоке?

Как я могу остановить обработку блоков DataFlow, если один из блоков решил, что произошла ошибка, препятствующая запуску следующих блоков. Я думал, что блок может генерировать исключение, но не уверен, как правильно остановить дальнейшую обработку конвейера.

ОБНОВИТЬ:

private async void buttonDataFlow_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockTestMover.Completion;
    }
    catch(Exception ee)
    {
        Console.WriteLine(ee.Message);
    }

    Console.WriteLine("Done");
}

ОБНОВЛЕНИЕ 2:

    public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                    Func<TInput, Task> action,
                    Action<Exception> exceptionHandler,
                    ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new ActionBlock<TInput>(async input =>
        {
            try
            {
                await action(input);
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
            }
        }, dataflowBlockOptions);
    }

person Pablo    schedule 17.07.2016    source источник
comment
Вы можете взглянуть на минималистическую библиотеку Стивена Клири Try. Это позволяет передать сообщение через все блоки конвейера, а затем наблюдать за любым исключением, которое произошло с этим сообщением в конце.   -  person Theodor Zoulias    schedule 26.06.2020


Ответы (2)


Если вы хотите, чтобы исключение в блоке означало, что текущие элементы идут дальше по конвейеру, но обработка других элементов должна продолжаться без перерыва, то вы можете сделать это, создав блок, который создает один элемент, если обработка завершается успешно, но создает нулевые элементы при возникновении исключения:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    Action<Exception> exceptionHandler,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex);

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}
person svick    schedule 17.07.2016
comment
Это было именно то, что я искал! - person Pablo; 17.07.2016
comment
только одна последняя вещь. Хотя ActionBlock является последним блоком, я попытался создать для него аналогичный метод. Мой пост обновлен попыткой создать такой метод. Но компилятор жалуется, что не может ждать 'void'. Я могу понять его, но не знаю, как решить эту проблему. - person Pablo; 17.07.2016
comment
Действие, которое будет запущено, возвращает Task. - person Pablo; 17.07.2016

Если у вас есть конвейер, вы, вероятно, уже используете PropagateCompletion = true. Это означает, что если один блок в конвейере выйдет из строя с исключением, все блоки после него также потерпят неудачу.

Остается остановить все блоки, которые стоят перед блоком, который вышел из строя. Для этого вы можете дождаться Completion последнего блока в конвейере. Если это броски, провалите первый блок, вызвав на нем Fault(). Код может выглядеть так:

// set up your pipeline

try
{
    await lastBlock.Completion;
}
catch (Exception ex)
{
    ((IDataflowBlock)firstBlock).Fault(ex);

    throw; // or whatever is appropriate to propagate the exception up
}
person svick    schedule 17.07.2016
comment
Это отменяет все, даже те работы, в которых нет ошибок. Я имею в виду, что я отправляю количество заданий в конвейер в цикле foreach, и все они отменяются, если одно из них неисправно. Первые 8 элементов (8, потому что у меня 8-ядерный процессор) работают, один из них не идет дальше, а остальные завершаются. Но кроме этих 8 предметов, остальные больше не обрабатываются. Если исключений не возникает, то обрабатываются все элементы. Я обновил исходный код в своем посте. - person Pablo; 17.07.2016
comment
@Pablo Пабло Я думал, что это то, чего вы хотели, чтобы остановить дальнейшую обработку [в] конвейере. - person svick; 17.07.2016
comment
Я думал, что пайплайн — это когда я публикую для блокировки задание с некоторыми входными данными, затем задание переходит в другой блок и так далее, в зависимости от связанных блоков. Если я опубликую только один раз, то цепочка блоков будет отменена, как и ожидалось. Но если я публикую несколько раз, то удаляются все посты, даже те, которые идут без исключения. Другими словами, если я отправляю входные данные [1, 2, 3, 4] в цепочку блоков и если только когда я отправляю 2, возникает исключение, то 1, 3, 4 завершают работу, как ожидалось. Я, вероятно, использовал неправильные термины, но я не уверен, как лучше описать. - person Pablo; 17.07.2016