RxJava: retryWhen с ограничением повторных попыток

Я новичок в ReactiveX и реактивном программировании в целом. Мне нужно реализовать механизм повторных попыток для операций Couchbase CAS, но пример на веб-сайте Couchbase показывает retryWhen, который, кажется, повторяется бесконечно. Мне нужно где-то там указать лимит повторных попыток и количество повторных попыток.

Простой retry() будет работать, так как он принимает retryLimit, но я не хочу, чтобы он повторял попытку при каждом исключении, а только при исключении CASMismatchException.

Любые идеи? Я использую библиотеку RxJava.


person Reezy    schedule 20.01.2015    source источник


Ответы (3)


В дополнение к тому, что сказал Саймон Базл, вот краткая версия с линейным отставанием:

.retryWhen(notification ->
    notification
    .zipWith(Observable.range(1, 5), Tuple::create)
    .flatMap(att ->
            att.value2() == 3 ? Observable.error(att.value1()) : Observable.timer(att.value2(), TimeUnit.SECONDS)
    )
)

Обратите внимание, что «att» здесь — это кортеж, который состоит как из выбрасываемого, так и из количества повторных попыток, поэтому вы можете очень конкретно реализовать логику возврата на основе этих двух параметров.

Если вы хотите узнать еще больше, вы можете просмотреть отказоустойчивый документ, который я сейчас пишу: https://gist.github.com/daschl/db9fcc9d2b932115b679#retry-with-delay

person daschl    schedule 20.01.2015
comment
Кто-нибудь может объяснить, почему здесь требуется flatMap? Кажется, задержка не работает без него - person deviant; 03.02.2016

retryWhen явно немного сложнее, чем простая повторная попытка, но вот суть:

  • вы передаете функцию notificationHandler в retryWhen, которая принимает Observable<Throwable> и выводит Observable<?>
  • выброс этого возвращенного Observable определяет, когда повторная попытка должна произойти или остановиться
  • поэтому для каждого возникающего исключения в исходном потоке, если обработчик выдает 1 элемент, будет 1 повторная попытка. Если он излучает 2 предмета, будет 2...
  • как только поток обработчика выдает ошибку, повтор прерывается.

Используя это, вы можете:

  • работать только с CasMismatchExceptions: просто пусть ваша функция возвращает Observable.error(t) в других случаях
  • повторите попытку только определенное количество раз: для каждого исключения flatMap из Observable.range, представляющего максимальное количество повторных попыток, пусть он возвращает Observable.timer с использованием # повторной попытки, если вам нужно увеличить задержки.

Ваш вариант использования очень близок к описанному в документе RxJava здесь

person Simon Baslé    schedule 20.01.2015
comment
Я хочу использовать повторную попытку при ограничении количества. Я пробовал Observable.range(0, 10), но это не работает. - person Allen Vork; 05.07.2016

возрождаю эту ветку, так как в Couchbase Java SDK 2.1.2 есть новый более простой способ сделать это: используйте RetryBuilder:

Observable<Something> retryingObservable =
sourceObservable.retryWhen(
  RetryBuilder
    //will limit to the relevant exception
    .anyOf(CASMismatchException.class)
    //will retry only 5 times
    .max(5)
    //delay doubling each time, from 100ms to 2s
    .delay(Delay.linear(TimeUnit.MILLISECONDS, 2000, 100, 2.0))
  .build()
);
person Simon Baslé    schedule 20.05.2015
comment
Можете ли вы добавить несколько разных исключений с разными задержками? А также можно ли установить задержку по умолчанию для всего остального? - person Roee Gavirel; 05.02.2017
comment
Нет, вам нужно просто связать несколько блоков retryWhen, каждый со своим компоновщиком, как если бы вы сцепили несколько блоков catch в императивном коде. - person Simon Baslé; 05.02.2017