RxJava: как исправить ошибки в операторе flatMap

У меня есть EditText, где пользователь вводит поисковый запрос, и я хочу выполнять мгновенный поиск на моем сервере, когда пользователь что-то вводит.

Я пытаюсь сделать это с RxJava следующим образом:

RxTextView.textChanges(editQuery) // I'm using RxBinding for listening to text changes
    .flatMap(new Func1<CharSequence, Observable<UserPublic[]>>() {
        @Override
        public Observable<UserPublic[]> call(CharSequence query) {
            return api.searchUsers(query); // I'm using Retrofit 1.9 for network calls. searchUsers returns an Observable<UserPublic[]>
        }
    })
    .subscribe(Observers.create(
        new Action1<UserPublic[]>() {
            @Override
            public void call(UserPublic[] userPublics) {
                processResult(userPublics);
            }
        })
        , new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                processError(throwable);
            }
    });

проблема в том, что если сетевой вызов обнаруживает ошибку, все наблюдаемое останавливается. Поэтому, когда пользователь продолжает печатать, ничего не происходит.

Как я могу изменить этот код так, чтобы:

  1. Когда возникает проблема с сетью, вызывается processError
  2. Но когда пользователь продолжает набирать текст, продолжают поступать новые сетевые вызовы (что снова приводит к processResult / processError).

person netimen    schedule 10.11.2015    source источник


Ответы (2)


Используйте оператор retryWhen() и прикрепите его к наблюдаемой цепочке перед подпиской. Обратите внимание, что аргумент retryWhen() - это функция, которая принимает Observable<Throwable> и возвращает Observable<?>. Не имеет значения возвращаемый тип, поскольку оператор использует результат onNext() для инициирования повторной попытки и результаты onError() или onCompleted() для завершения цепочки.

Вот наивное приложение, которое ждет 5 секунд и пытается снова:

observable
  .retryWhen( errorObservable -> errorObservable.delay( 5, TimeUnit.SECONDS ) )
  .subscribe();

Вот менее наивная операция, которая повторяет попытку по истечении времени ожидания и завершается неудачно, если возникает IOException:

observable
  .retryWhen( errorObservable -> errorObservable.flatMap( throwable -> { // (1)
       if ( throwable instanceof IOException ) {
         return Observable.error( throwable ); // (2)
       }
       return Observable.just(1); // (3)
     } )
  .subscribe();
  1. Использование flatMap() позволяет отложить решение до тех пор, пока вы не узнаете, с какой ошибкой имеете дело.
  2. Возвращаемый наблюдаемый объект вызывает указанную ошибку или что-то еще, что вы хотите лучше описать своей проблемой.
  3. Предоставление наблюдаемой, которая просто onNext(), указывает оператору retryWhen() повторно подписаться на исходную наблюдаемую.
person Bob Dalgleish    schedule 03.08.2017

Я могу предложить вам взглянуть на страницу операторов обработки ошибок в Документация RxJava. Поэкспериментируйте с разными операторами, чтобы выбрать тот, который лучше всего подходит для вашего варианта использования. Я думаю, вам, вероятно, следует использовать какое-то значение по умолчанию, возвращаемое onErrorResumeNext(), например возвращает new UserPublic[0]. Вызов processError здесь будет проблематичным, особенно если он связан с пользовательским интерфейсом, поскольку вы, вероятно, все еще обрабатываете в фоновом потоке.

person Egor    schedule 10.11.2015