rxjava — Как обрабатывать исключения слияния, не прерывая весь процесс

Я создал два наблюдаемых объекта.
Один из них выдает исключение.

obs1 = Observable.from(new Integer[]{1, 2, 3, 4, 5, 6});

obs2 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override public void call(Subscriber<? super Integer> subscriber) {
        boolean b = getObj().equals(""); // this throws an exception
        System.out.println("1");
    }
});

Теперь я вызываю их, используя

Observable.merge(obs2, obs1)
          .subscribe(new Observer<Integer>() {
                @Override public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override public void onNext(Integer integer) {
                    System.out.println("onNext - " + integer);
                }
            });

Теперь я не хочу, чтобы мой процесс полностью останавливался при возникновении исключения -
я хочу обработать его, и я хочу, чтобы obs1 продолжал свою работу.

Я пытался написать его с помощью onErrorResumeNext(), onExceptionResumeNext(), doOnError(), но ничего не помогло - obs1 не запустился.

Как я могу обработать исключение, не останавливая обработку других наблюдаемых?


person Bick    schedule 10.11.2014    source источник
comment
Почему бы не использовать obs2.onErrorResumeNext(e -> Observable.empty())?   -  person zsxwing    schedule 10.11.2014
comment
Это не запускает obs1   -  person Bick    schedule 11.11.2014
comment
Итак, вы хотите игнорировать все ошибки obs1 и obs2? Однако проблема в том, что если obs1 является наблюдаемой с хорошим поведением, она завершится после выдачи onError.   -  person zsxwing    schedule 11.11.2014


Ответы (2)


Похоже, вам нужно mergeDelayError.

person akarnokd    schedule 10.03.2015
comment
Это не дает ответа на вопрос. Чтобы подвергнуть критике или запросить разъяснения у автора, оставьте комментарий под его публикацией. - person nempoBu4; 11.03.2015
comment
@ nempoBu4 откуда ты знаешь, что это не ответ? У OP есть два потока, один из которых выдает ошибку, и он все еще хочет получать значения от другого. Фактический оператор в RxJava для этой цели — reactivex. io/RxJava/javadoc/rx/ - person akarnokd; 11.03.2015
comment
«Похоже» означает, что вы пытаетесь угадать, а не ответить, поэтому ваш ответ отмечен как Low Quality. Но ваш комментарий лучше подходит для ответа. И было бы здорово, если бы вы добавили несколько примеров в свой ответ. - person nempoBu4; 11.03.2015

Проблема в вашем подписчике, который сломан. Вы должны поймать свое исключение и вызвать onError. В противном случае вы нарушили контракт rx.

пример :

    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6));

    Observable<Integer> obs2 = Observable.create((Subscriber<? super Integer> subscriber) -> {
        subscriber.onError(new NullPointerException());
    });

    Observable.merge(obs2.onErrorResumeNext((e) -> Observable.empty()), obs1)
            .subscribe(new Observer<Integer>() {
                @Override public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override public void onNext(Integer integer) {
                    System.out.println("onNext - " + integer);
                }
            });

поэтому, если вы замените свой код obs2 этим, он должен работать так, как вы ожидали:

obs2 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override public void call(Subscriber<? super Integer> subscriber) {
        try {

            boolean b = getObj().equals(""); // this throws an exception
            System.out.println("1");
        } catch(Exception ex) {
            subscriber.onError(ex);
        }
    }
});
person dwursteisen    schedule 25.11.2014