OnNext не вызывается для наблюдаемого при использовании combLatest и take

Я пытаюсь заставить подписку автоматически отменять подписку, когда она выдает элемент. Базовая наблюдаемая создается следующим образом.

public static Observable<RxBleConnection> setupConnection(RxBleDevice device, PublishSubject<Void> disconnectTrigger) {
    return device
            .establishConnection(false)
            .takeUntil(disconnectTrigger)
            .retry(3)
            .retryWhen(o -> o.delay(RETRY_DELAY, TimeUnit.MILLISECONDS))
            .compose(new ConnectionSharingAdapter());
}

Затем я пытаюсь объединить три операции чтения в файл ProgramModel.

private void readCharacteristics(Action1<ProgramModel> onReadSuccess) {
    mConnectionObservable
            .flatMap(rxBleConnection ->
                    // combines the following three observables into a single observable that is
                    // emitted in onNext of the subscribe
                    Observable.combineLatest(
                            rxBleConnection.readCharacteristic(UUID_SERIAL_NUMBER),
                            rxBleConnection.readCharacteristic(UUID_MACHINE_TYPE),
                            rxBleConnection.readCharacteristic(UUID_CHARACTERISTIC),
                            ProgramModel::new))
            .observeOn(AndroidSchedulers.mainThread())
            .take(1)
            .subscribe(programModel -> {
                programModel.trimSerial();
                onReadSuccess.call(programModel);
            }, BleUtil::logError);
}

Так что теоретически после того, как модель программы пройдет через oNext подписки, подписка будет отменена. По какой-то причине операция зависает, и onNext и onError никогда не вызываются. Если я удалю take(1), это будет работать нормально, но я не хочу иметь дело с удержанием ссылки на подписку и отменой подписки вручную. Кто-нибудь знает, что я делаю неправильно или почему onNext не вызывается?


person David Carek    schedule 07.12.2017    source источник
comment
Что такое ConnectionSharingAdapter. Производит ли индивидуум readCharacteristics какие-либо предметы? Поместите doOnNext в разные места, чтобы увидеть, где исчезают события.   -  person akarnokd    schedule 07.12.2017
comment
Адаптер общего доступа к соединению является частью RxAndroidBle. Поскольку каждый раз, когда вызывается подписка, создается соединение с устройством, поэтому адаптер общего доступа в основном гарантирует, что только один из них активен в каждый момент времени. попробую doOnNext   -  person David Carek    schedule 07.12.2017


Ответы (1)


Мне нужно было вызвать take(1) до flatMap, а также после него. Этот пост как бы объясняет это Чтение нескольких характеристик с Android устройство с использованием библиотеки RxAndroidBle

person David Carek    schedule 07.12.2017
comment
Насколько я знаю RxJava, этого не должно быть, и код из исходного сообщения должен работать так, как ожидалось — должно быть вызвано одно из действий подписки. Не могли бы вы добавить RxBleLog.setLogLevel(RxBleLog.VERBOSE) и больше журналирования, если называется часть, где вы создаете Observable.combineLatest? Должна быть какая-то причина поведения, которое вы испытываете. - person Dariusz Seweryn; 07.12.2017