RxJava retryWhen (экспоненциальная задержка) не работает

Я знаю, что об этом уже много раз спрашивали, но я пробовал много вещей, и ничего не помогло.

Начнем с этих блогов / статей / кода:

И многие другие.

Вкратце, все они описывают, как можно использовать retryWhen для реализации экспоненциального отката. Что-то вроде этого:

source
.retryWhen(
  errors -> {
    return errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(
       retryCount -> {
       System.out.println("retry count " + retryCount);
       return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
     });
 })

С этим согласуется даже документация в библиотеке: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.

Тем не менее, я пробовал этот и несколько довольно похожих вариантов, не заслуживающих описания здесь, и, похоже, ничего не работает. Есть способ, которым примеры работают и используют блокировку подписчиков, но я хочу избежать блокировки потоков.

Итак, если к предыдущему наблюдаемому мы применим блокирующего подписчика следующим образом:

.blockingForEach(System.out::println);

Работает как положено. Но как то не идея. Если мы попробуем:

.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

Поток выполняется только один раз, поэтому я не хочу этого достичь.

Означает ли это, что его нельзя использовать так, как я пытаюсь? Судя по документации, попытка выполнить мое требование не представляет проблемы.

Есть идеи, что мне не хватает?

TIA.

Изменить: я тестирую это двумя способами:

Метод тестирования (с использованием testng):

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
              System.out.println("trying again");
              return Integer.parseInt(x);
            });
source
    .retryWhen(
        errors -> {
          return errors
              .zipWith(Observable.range(1, 3), (n, i) -> i)
              .flatMap(
                  retryCount -> {
                    return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
                  });
        })
    .subscribe(...);

От потребителя Kafka (с использованием загрузки Spring):

Это всего лишь подписка на наблюдателя, но логика повторных попыток - это то, что я описал ранее в этом посте.

@KafkaListener(topics = "${kafka.config.topic}")
      public void receive(String payload) {
        log.info("received payload='{}'", payload);
        service
            .updateMessage(payload)
            .subscribe(...)
            .dispose();
      }

person Yuliban    schedule 13.02.2020    source источник
comment
Пожалуйста, отформатируйте код, его трудно читать.   -  person Boris    schedule 13.02.2020
comment
Вы запускаете это из основного метода? RxJava использует потоки демона, и если ваш основной поток завершается во время фоновой работы, эти потоки будут завершены JVM. Вот почему в примерах используются blocking методы.   -  person akarnokd    schedule 13.02.2020
comment
Я обновил сообщение, чтобы объяснить, как я вызываю этот метод.   -  person Yuliban    schedule 13.02.2020
comment
вы столкнулись с той же проблемой, что и здесь stackoverflow.com/questions/59005142/   -  person bubbles    schedule 14.02.2020


Ответы (1)


Основная проблема вашего кода заключается в том, что Observable.timer по умолчанию работает в планировщике вычислений. Это добавляет дополнительные усилия при попытке проверить поведение в тесте.

Вот код модульного тестирования, который проверяет, действительно ли ваш код повтора повторяет попытку.

  • Он добавляет счетчик, чтобы мы могли легко проверить, сколько звонков произошло.
  • Он использует TestScheduler вместо планировщика вычислений, поэтому мы может притвориться, что движется во времени с помощью advanceTimeBy.

    TestScheduler testScheduler = new TestScheduler();
    AtomicInteger counter = new AtomicInteger();
    
    Observable<Integer> source =
        Observable.just("test")
            .map(
                x -> {
                    System.out.println("trying again");
                    counter.getAndIncrement();
                    return Integer.parseInt(x);
                });
    TestObserver<Integer> testObserver = source
        .retryWhen(
            errors -> {
                return errors
                    .zipWith(Observable.range(1, 3), (n, i) -> i)
                    .flatMap(
                        retryCount -> {
                            return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
                        });
            })
        .test();
    
    assertEquals(1, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(2, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(3, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(4, counter.get());
    
    testObserver.assertComplete();
    
person cdehning    schedule 17.04.2020