onCompleted вызывается перед обработкой RxJava concatMap

У меня есть две наблюдаемые, и я использовал concatDelayError для последовательной обработки.

Моя проблема в том, что onNext и onCompleted вызываются заблаговременно до обработки. Как я узнаю, что вся обработка завершена с помощью concatDelayError?

Псевдокод:

public Observable<Integer> concat(){
     int x = 10;
     int y = 20;


        Observable obx = Observable.create(emitter -> {
                    try {
                        int x = doSomeThing();
                        emitter.onNext(x);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);   
        Observable oby = Observable.create(emitter -> {
                    try {
                        int y = doSomeThing();
                        emitter.onNext(y);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);       
        Observable concated =  Observable.concatDelayError(ob1,ob2)
                 .compose(applySchedulers())
                 .replay().autoConnect();  

  }

    //somewhere else

    concat().subscribe(mReplaySubject);


    //somewhere else

     mReplaySubject.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            launchActivity(SplashActivity.this, HomeActivity.class);
                            SplashActivity.this.finish();
                        }    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }    
                        @Override
                        public void onNext(Integer value) {

                        }
                    });

Я использую answer() с autoConnect() и подписываюсь на ReplySubject, так как мне нужно поделиться одной подпиской.


person krupal.agile    schedule 10.01.2018    source источник
comment
Вы уверены, что ваш Observer.onCompleted() вызывается несколько раз для одной подписки?   -  person akarnokd    schedule 10.01.2018
comment
Пожалуйста, добавьте журналирование, применив .doOnEach() как к исходным наблюдаемым, так и к конкатенированному. Иначе действительно непонятно.   -  person Maxim Volgin    schedule 10.01.2018
comment
@akarnokd Я использую ReplaySubject. Я изменил вопрос. Не могли бы вы ответить на вопрос?   -  person krupal.agile    schedule 10.01.2018
comment
@akarnokd Я вошел в систему, и он не звонит несколько раз, но звонит раньше, даже до того, как обработка была завершена.   -  person krupal.agile    schedule 10.01.2018
comment
Вы пробовали подписаться на concated напрямую без ReplaySubject? replay().autoConnect() уже выполняет кэширование за вас.   -  person akarnokd    schedule 10.01.2018
comment
@akarnokd не будет каждый раз вызывать doOnSubscribe()?   -  person krupal.agile    schedule 10.01.2018
comment
@akarnokd Я хочу выполнить некоторую обработку, когда он подписывается только в первый раз, тогда как я могу это сделать?   -  person krupal.agile    schedule 10.01.2018
comment
Пожалуйста, предоставьте полный автономный код, с которым у вас возникли проблемы, поскольку код, который вы указали, не соответствует вашим комментариям.   -  person akarnokd    schedule 10.01.2018
comment
@akarnokd Я отредактировал вопрос. Я хочу инициализировать эти x и y только при первой подписке. Как я могу это сделать?   -  person krupal.agile    schedule 10.01.2018
comment
@akarnokd Это всего лишь пример инициализации. На самом деле я очищаю несколько массивов и т.д.   -  person krupal.agile    schedule 10.01.2018


Ответы (1)


Вы можете использовать doOnSubscribe перед replay().autoConnect(), но вам также нужен способ убедиться, что возвращаемый Observable установлен не более одного раза:

final class OnceObservable {

    final AtomicReference<Observable<Integer>> ref = new AtomicReference<>();

    int x;
    int y;

    Observable<Integer> concat() {
        Observable<Integer> obs = ref.get();
        if (obs != null) {
            return;
        }

        obs = Observable.concatDelayError(
            Observable.fromCallable(() -> doSomething()),
            Observable.fromCallable(() -> doSomethingElse())
        )
        .doOnSubscribe(() -> {
            x = 10;
            y = 20;
        })
        .replay().autoConnect();

        if (!ref.compareAndSet(null, obs)) {
            obs = ref.get();
        }
        return obs;
    }
}
person akarnokd    schedule 10.01.2018
comment
Последний вопрос: .replay().autoConnect(); автоматически делать кэширование подписки? - person krupal.agile; 10.01.2018
comment
все еще он вызывает onCompleted() рано - person krupal.agile; 10.01.2018
comment
Какая подписка? Кроме того, как обнаружить ранний onCompleted? - person akarnokd; 10.01.2018