Почему Polly AsyncCircuitBreakerPolicy разрывает связи между блоками потока данных TPL?

Вот простой пример кода с использованием TransformBlock и ActionBlock. Я использую библиотеку Polly, чтобы помочь с логикой повтора. Проблема в том, что как только я вручную открываю CircuitBreaker, а затем снова закрываю его, связь между downloadBlock и actionBlock разрывается. Этого не произойдет, если я опущу предикат установки в вызове LinkTo:

private readonly TransformBlock<DataClass, DataClass> downloadBlock;
private readonly ActionBlock<DataClass> actionBlock;
private readonly AsyncCircuitBreakerPolicy circuitBreaker;
private readonly AsyncRetryPolicy retryPolicy;

retryPolicy = Policy.Handle<WebException>().RetryAsync(4);
circuitBreaker = Policy.Handle<WebException>().CircuitBreakerAsync(10, TimeSpan.FromSeconds(10));

downloadBlock = new TransformBlock<DataClass, DataClass>(async (data) =>
{
    var finalPolicy = retryPolicy.WrapAsync(circuitBreaker);
    try
    {
        await finalPolicy.ExecuteAsync(async () =>
        {
             //await DoSomething();
             data.Status = Status.Completed;
        });
    }
    catch (WebException we)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    catch (BrokenCircuitException)
    {
         data.Status = Status.Failed;
         //Do logging
    }
    return data;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });


actionBlock = new ActionBlock<DataClass>((data) =>
{
   //DoSomething(data);
}});
//Here, if I set data => data.Status != Status.Failed and then at some point I manually call
//circuitBreaker.Isolate() and then after some time circuitBreaker.Reset() to manually close the circuit again,
//items newly pushed into the pipeline are being processed in the downloadBlock but the results
//are not propagated to actionBlock even if their status is OK. This does not happen if ommit setting the predicate.

downloadBlock.LinkTo(actionBlock,
                     new DataflowLinkOptions { PropagateCompletion = true }, data => data.Status != Status.Failed);

Я мог бы, конечно, проверить статус в самом actionBlock, но я хотел бы знать, сталкивался ли кто-нибудь с таким поведением и в чем может быть причина этого?

РЕДАКТИРОВАТЬ: Вот шаги, необходимые для воспроизведения проблемы:

  1. помещать данные в конвейер, обрабатывать их,
  2. вызовите circuitBreaker.Isolate (),
  3. помещать новые данные в конвейер, они попадают в исключение BrokenCircuitException и по логике не переходят к следующему блоку из-за предиката. Хорошо.
  4. вызовите circuitBreaker.Reset (). Теперь все должно работать.
  5. помещает новые данные в конвейер, они обрабатываются в downloadBlock, их статус в порядке, но они просто не передаются на следующий ActionBlock

person niks    schedule 14.07.2020    source источник
comment
Что вы имеете в виду, говоря, что связь между downloadBlock и actionBlock не работает? В вашей ссылке есть предикат сообщения, в котором говорится, что только те данные должны переходить из SourceBlock в TargetBlock, где выполняется условие. Несмотря на то, что ссылки могут быть динамическими (вы можете связывать и отключать блоки в любое время), они не будут разъединяться из-за исключения. И вы уловили BrokenCircuitException, поэтому, пожалуйста, уточните свое предложение, потому что непонятно, что вы здесь имели в виду.   -  person Peter Csala    schedule 14.07.2020
comment
@Peter Я провел более глубокое тестирование и нашел точное условие появления этого поведения. Я опишу шаги: 1) отправить данные в конвейер, обработать их, 2) вызвать circuitBreaker.Isolate(), 3) отправить новые данные в конвейер, они попадают в BrokenCircuitException и по логике не переходят к следующему блоку из-за предиката. Хорошо. 4) звоните circuitBreaker.Reset(). Теперь все должно работать. 5) поместите новые данные в конвейер, они будут обработаны в downloadBlock, их статус в порядке, но они просто не передаются на следующий ActionBlock.   -  person niks    schedule 14.07.2020
comment
@Peter И если вы повторите все шаги, кроме пункта 3 (отправка новых данных в конвейер, пока цепь открыта), все работает нормально. Кажется, что второй пакет информации что-то делает с конвейером.   -  person niks    schedule 14.07.2020
comment
Прежде всего, пожалуйста, добавьте описанный сценарий к вашему вопросу, он внесет ясность для будущих читателей. Во-вторых, основная причина этой проблемы может заключаться в том, что ваша логика повтора не предусматривает каких-либо штрафов. SleepDuration в номенклатуре Полли. Пожалуйста, добавьте логирование в политики Retry и CB. Здесь я кратко описал, как это можно сделать.   -  person Peter Csala    schedule 15.07.2020
comment
@Peter Я внес изменения, о которых вы просили. Я также пытался применить наказание следующим образом: WaitAndRetryAsync(2, retryAttempt => TimeSpan.FromSeconds(2) Но проблема все еще существует.   -  person niks    schedule 15.07.2020
comment
Вы предоставили обратный вызов для onRetryAsync, чтобы проверить, какое исключение было сгенерировано и когда?   -  person Peter Csala    schedule 15.07.2020
comment
@Peter Я не понимаю, что вы имеете в виду. Не могли бы вы уточнить?   -  person niks    schedule 15.07.2020
comment
Извините, я пропустил, что ваша текущая логика повтора срабатывает только в случае WebException. (Думал стреляет по любому Exception). Не могли бы вы проверить OutputAvailableAsync 1 до и после вызова Reset вручную?   -  person Peter Csala    schedule 15.07.2020
comment
@Peter Tasks Result является ложным до и после вызова Reset. Понятия не имею, почему это происходит. Я начинаю думать, что мне нужно просто опустить предикат и позволить всему переходить от блока к блоку, а затем просто проверить статус элемента внутри следующего блока. Я думаю, что это не так уж важно.   -  person niks    schedule 16.07.2020
comment
Правильно ли работает без фильтра сообщений? Если да, то что вы должны учитывать количество сообщений за определенный период.   -  person Peter Csala    schedule 16.07.2020
comment
@Peter Да, без фильтра сообщений он работает нормально. Такое поведение происходит, даже если я обрабатываю отдельное сообщение, поэтому я полагаю, что это не потому, что конвейер перегружен.   -  person niks    schedule 16.07.2020
comment
Отлично, тогда используйте его без фильтрации: D Я только что проверил связанный исходный код. Как видите, предоставленный predicate будет указывать на использование класса FilteredLinkPropagator. Его OfferMessage метод будет решать, распространять ли данные или нет. У меня нет времени по-настоящему вникать в это, но есть случай, когда он возвращается с DataflowMessageStatus.Declined, что для меня подозрительно.   -  person Peter Csala    schedule 16.07.2020
comment
@Peter Да, думаю, сейчас я воспользуюсь им без фильтрации. Может, начну баунти, если решу, что мне это действительно нужно для работы с фильтрацией. Большое спасибо за все ваши усилия!   -  person niks    schedule 16.07.2020
comment
Думаю, было бы разумно, если бы вы ответили на собственный вопрос, в котором перечислите все результаты и текущее решение.   -  person Peter Csala    schedule 16.07.2020