Вызов subject.onNext () внутри doOnSubscribe

Почему вызов subject.onNext(o) внутри doOnSubscribe не дает никакого эффекта, однако вызов subject.onComplete() приводит к завершению потока !?

final PublishSubject<Integer> subject = PublishSubject.create();

    final Observable<Integer> observable = subject.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception {
            System.out.println("disposable = [" + disposable + "]");
            subject.onNext(1);
            //or
            Observable.just(2, 3).subscribe(subject);
        }
    });

    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("d = [" + d.isDisposed() + "]");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("item = [" + integer + "]");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("e = [" + e + "]");
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });/*
    expected:
    disposable = [false]
    d = [false]
    item = 1
    item = 2
    item = 3
    onComplete
    but received :
    disposable = [false]
    d = [false]
    onComplete
    */

person M. Reza Nasirloo    schedule 06.04.2017    source источник


Ответы (1)


При подписке на Subject в 2.x, Disposable, представляющий соединение, пересекает цепочку onSubscribe() до того, как конкретный Observer станет видимым для onNext. Вы можете увидеть это, если вызовете hasObservers из onSubscribe, где он вернет false до тех пор, пока onSubscribe не вернется.

Это требуется протоколом Observable, так как одновременное выполнение onSubscribe и onNext запрещено, а onSubscribe должно произойти до onNext. Если это правило не соблюдается, параллельный вызов Subject.onNext будет выполняться раньше или даже одновременно с вызовом Observer.onSubscribe и обнаружит, вероятно, неподготовленного потребителя.

Поскольку PublishSubject не сохраняет никаких onNext вызовов, незаметные onNext элементы удаляются. В зависимости от варианта использования вместо этого следует использовать BehaviorSubject или yourSubject.startWith(initialValue).subscribe(), чтобы получить значение перед любым другим onNext из Subject.

person akarnokd    schedule 06.04.2017