Лучшая практика обработки onError и продолжения обработки

Я новичок в RxJava, но я интегрирую его в проект, над которым работаю, чтобы помочь мне его изучить. Я столкнулся с вопросом о лучших практиках.

У меня есть вопрос о том, как обработать onError, чтобы предотвратить остановку обработки Observable.

Вот настройка:

У меня есть список идентификаторов пользователей, для каждого из которых я хотел бы выполнить 2 или более сетевых запроса. Если какой-либо из сетевых запросов для идентификатора пользователя завершается неудачей, этот идентификатор пользователя не будет обновлен и может быть пропущен. Это не должно препятствовать обработке других идентификаторов пользователей. У меня есть решение, но оно включает вложенные подписки (см. второй блок кода). Одна проблема, которую я вижу, заключается в том, что если каждый вызов терпит неудачу, нет способа закоротить и предотвратить попадание оставшихся в сетевой ресурс даже после обнаружения определенного порогового числа.

Есть лучший способ это сделать?

В традиционном коде:

List<String> results = new ArrayList<String>();
for (String userId : userIds) {
    try {
        String info = getInfo(userId);  // can throw an GetInfoException
        String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException
        results.add(info + ", " + otherInfo);
    } catch (GetInfoException e) {
        log.error(e);
    } catch (GetOtherInfoException e) {
        log.error(e);
    }
}

ПРОБЛЕМА:

Псевдокод:

userid -> network requests -> result 
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])

Ниже приведен рабочий пример списка идентификаторов пользователей и для каждых 2 запросов информации. Если вы запустите его, вы увидите, что он потерпит неудачу (см. Ниже исходный код)

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.mergeDelayError(info1, info2);
            }
        });

        t.subscribe(new Action1<String>() {

            public void call(String t1) {
                System.out.println(t1);
            }
        }, new Action1<Throwable>() {

            public void call(Throwable t1) {
                t1.printStackTrace();
            }
        },
        new Action0(){

            public void call() {
                System.out.println("onComplete");
            }

        });
    }
}

Вывод:

1::: 1
2::: 1
2::: 2
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
        at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
        at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
        at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)

Решение для вложенной подписки:

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        userIdObservable.subscribe(new Action1<String>() {

            public void call(String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ", t1, "3");
                Observable.merge(info1, info2).subscribe(new Action1<String>() {

                    public void call(String t1) {
                        System.out.println(t1);
                    }
                }, new Action1<Throwable>() {

                    public void call(Throwable t1) {
                        t1.printStackTrace();
                    }
                },
                        new Action0() {

                            public void call() {
                                System.out.println("onComplete");
                            }

                        });
            }
        });
    }
}

Вывод:

1::: 1
2::: 1
onComplete
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete

Как вы можете видеть, только отдельные идентификаторы пользователей, для которых произошел сбой, остановили свою индивидуальную обработку, но остальные идентификаторы пользователей были обработаны.

Просто ищу совет, посмотрите, имеет ли смысл это решение, а если нет, то какова наилучшая практика.

Спасибо, Алекс


person Alex Beggs    schedule 12.03.2014    source источник
comment
Я думаю, что вам не хватает некоторых типов. Вам действительно не нужен Observable строки; вам нужен наблюдаемый элемент (успешный элемент | неуспешный элемент), и вы хотите сделать что-то интересное с успехами и что-то еще с этими неудачами.   -  person James Moore    schedule 01.04.2016
comment
Это был очень упрощенный пример   -  person Alex Beggs    schedule 01.04.2016


Ответы (4)


Лучше всего использовать mergeDelayError( ), которые объединяют несколько Observables в один, позволяя free Observables для продолжения до распространения ошибок.

mergeDelayError ведет себя так же, как merge. Исключением является случай, когда одно из объединенных Observable завершается уведомлением onError. Если это произойдет при слиянии, объединенный Observable немедленно выдаст уведомление об ошибке и завершится. mergeDelayError, с другой стороны, будет откладывать сообщение об ошибке до тех пор, пока не даст другим не вызывающим ошибки Observables, которые он объединяет, шанс закончить испускание своих элементов, и он сам выпустит их и завершится только с уведомление onError, когда все остальные объединенные Observable завершились.

person Morteza Rastgoo    schedule 24.11.2015
comment
что я не хочу, чтобы наблюдаемое когда-либо заканчивалось? Я не думаю, что этот ответ является первоначальным вопросом, он предполагает, что все наблюдаемые события завершаются, и что можно отложить ошибку. - person Daniele Segato; 20.04.2016
comment
Согласитесь с Даниэлем, это объяснение функции mergeDelayError(), а не ответ на вопрос. - person Tomas Bartalos; 05.01.2017
comment
это не лучшая практика, а всего лишь один из способов обработки, другой здесь medium.com/@jagsaund/ - person Marian Paździoch; 17.10.2017
comment
@MarianPaździoch: Когда я ответил на вопрос, не было таких простых вариантов! Итак, если вы посмотрите на время публикации вашей ссылки на medium.com, вы увидите разницу в датах! В любом случае, спасибо за уведомление! - person Morteza Rastgoo; 22.10.2017

Поскольку вы хотите проигнорировать ошибку, вы можете попробовать onErrorResumeNext(Observable.<String>empty());. Например,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);
person zsxwing    schedule 12.03.2014
comment
и если я не хочу игнорировать ошибку (он же = обрабатывать в обратном вызове onError), но продолжать обрабатывать следующие события? - person Daniele Segato; 20.04.2016
comment
^ Затем снова подпишитесь, в onError - person Abdul Wasae; 17.11.2017

Как новичок в Rx, я также искал простой ответ для отдельной обработки исключений и продолжения обработки следующего события, но не смог найти ответы на вопросы @Daniele Segato. Вот одно решение, где у вас нет контроля:

В приведенных выше примерах предполагается, что у вас есть контроль над наблюдаемыми, т. е. один из способов — отложить ошибки до конца, используя mergeDelayError ИЛИ вернуть известное пустое событие Observable для каждого события как Observable отдельно, используя слияние.

Если это ошибка исходного события, вы можете использовать lift для создания другого наблюдаемого, который в основном изящно обрабатывает значение текущего наблюдаемого. Класс SimpleErrorEmitter имитирует неограниченный поток, который иногда может дать сбой.

Observable.create(new SimpleErrorEmitter())
        // transform errors to write to error stream
        .lift(new SuppressError<Integer>(System.err::println))
        .doOnNext(System.out::println)  // and everything else to console
        .subscribe();


class SimpleErrorEmitter implements OnSubscribe<Integer> {
@Override
public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);

    subscriber.onError(new FooException());

    subscriber.onNext(3);
    subscriber.onNext(4);

    subscriber.onCompleted();
}

class SuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public SuppressError(Action1<Throwable> onError) {
    this.onError = onError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
    return new Subscriber<T>(t1) {
        @Override
        public void onNext(T t) {
            t1.onNext(t);
        }
        @Override
        public void onError(Throwable e) { // handle errors using a separate function
            onError.call(e);
        }
        @Override
        public void onCompleted() {
            t1.onCompleted();
        }
    };
}

Если это ошибка обработки подписчика, которую можно попробовать/отловить и продолжить изящно

    Observable<Integer> justInts = justStrs.map((str) -> {
        try {
            return Integer.parseInt(str);
        } catch (NumberFormatException e) {
            return null;
        }
    });

Я все еще пытаюсь найти простой способ просто повторить или отложить попытку неудачного события и продолжить со следующего.

    Observable<String> justStrs = Observable
            .just("1", "2", "three", "4", "5")  // or an unbounded stream
            // both these retrying from beginning 
            // when you delay or retry, if they are of known exception type
            .retryWhen(ex -> ex.flatMap(eachex -> {
                // for example, if it is a socket or timeout type of exception, try delaying it or retrying it
                if (eachex instanceof RuntimeException) {
                    return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate());
                }
                return Observable.error(eachex);
            }))
            // or simply retry 2 times
            .retry(2) // if it is the source problem, attempt retry
            .doOnError((ex) -> System.err.println("On Error:" + ex));

Ссылка: https://groups.google.com/forum/#!topic/rxjava/trm2n6S4FSc

person kisna    schedule 28.10.2016

Глядя на источник Observable.flatMap:

return merge(map(func));

Если вы хотите, чтобы обрабатывались все возможные идентификаторы пользователей, вы можете использовать модифицированную версию flatMap:

Observable.mergeDelayError(userIdObservable.map(userInfoFunc))

Далее, если вы скажете:

Если какой-либо из сетевых запросов для идентификатора пользователя завершается неудачей, этот идентификатор пользователя не будет обновлен и может быть пропущен.

Тогда не используйте:

return Observable.mergeDelayError(info1, info2);

Потому что это приведет к тому, что и информация1, и информация2 будут запрошены, даже если один из них выйдет из строя.

Скорее иди с:

return Observable.merge(info1, info2);

Когда информация1 и информация2 подписаны на один и тот же поток, они будут выполняться последовательно, поэтому в случае сбоя информации1 информация2 никогда не будет запрошена. Поскольку информация1 и информация2 ограничены вводом-выводом, я предполагаю, что вы хотите запускать их параллельно:

getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io());
getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());

Это должно значительно ускорить вашу обработку

Весь код:

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        return Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        })
        .subscribeOn(Schedulers.io());
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.merge(info1, info2);
            }
        }));
        //rest is the same
    }
}
person Tomas Bartalos    schedule 05.01.2017