Я знаю, что об этом уже много раз спрашивали, но я пробовал много вещей, и ничего не помогло.
Начнем с этих блогов / статей / кода:
- https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
- https://jimbaca.com/rxjava-retrywhen/
- http://blog.inching.org/RxJava/2016-12-12-rx-java-error-handling.html
- https://pamartinezandres.com/rxjava-2-exponential-backoff-retry-only-when-internet-is-available-5a46188ab175
- https://gist.github.com/wotomas/35006d156a16345349a2e4c8e159e122
И многие другие.
Вкратце, все они описывают, как можно использовать retryWhen для реализации экспоненциального отката. Что-то вроде этого:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
С этим согласуется даже документация в библиотеке: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919.
Тем не менее, я пробовал этот и несколько довольно похожих вариантов, не заслуживающих описания здесь, и, похоже, ничего не работает. Есть способ, которым примеры работают и используют блокировку подписчиков, но я хочу избежать блокировки потоков.
Итак, если к предыдущему наблюдаемому мы применим блокирующего подписчика следующим образом:
.blockingForEach(System.out::println);
Работает как положено. Но как то не идея. Если мы попробуем:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
Поток выполняется только один раз, поэтому я не хочу этого достичь.
Означает ли это, что его нельзя использовать так, как я пытаюсь? Судя по документации, попытка выполнить мое требование не представляет проблемы.
Есть идеи, что мне не хватает?
TIA.
Изменить: я тестирую это двумя способами:
Метод тестирования (с использованием testng):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
От потребителя Kafka (с использованием загрузки Spring):
Это всего лишь подписка на наблюдателя, но логика повторных попыток - это то, что я описал ранее в этом посте.
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}
blocking
методы. - person akarnokd   schedule 13.02.2020