Я пытаюсь заставить подписку автоматически отменять подписку, когда она выдает элемент. Базовая наблюдаемая создается следующим образом.
public static Observable<RxBleConnection> setupConnection(RxBleDevice device, PublishSubject<Void> disconnectTrigger) {
return device
.establishConnection(false)
.takeUntil(disconnectTrigger)
.retry(3)
.retryWhen(o -> o.delay(RETRY_DELAY, TimeUnit.MILLISECONDS))
.compose(new ConnectionSharingAdapter());
}
Затем я пытаюсь объединить три операции чтения в файл ProgramModel
.
private void readCharacteristics(Action1<ProgramModel> onReadSuccess) {
mConnectionObservable
.flatMap(rxBleConnection ->
// combines the following three observables into a single observable that is
// emitted in onNext of the subscribe
Observable.combineLatest(
rxBleConnection.readCharacteristic(UUID_SERIAL_NUMBER),
rxBleConnection.readCharacteristic(UUID_MACHINE_TYPE),
rxBleConnection.readCharacteristic(UUID_CHARACTERISTIC),
ProgramModel::new))
.observeOn(AndroidSchedulers.mainThread())
.take(1)
.subscribe(programModel -> {
programModel.trimSerial();
onReadSuccess.call(programModel);
}, BleUtil::logError);
}
Так что теоретически после того, как модель программы пройдет через oNext
подписки, подписка будет отменена. По какой-то причине операция зависает, и onNext
и onError
никогда не вызываются. Если я удалю take(1)
, это будет работать нормально, но я не хочу иметь дело с удержанием ссылки на подписку и отменой подписки вручную. Кто-нибудь знает, что я делаю неправильно или почему onNext
не вызывается?
ConnectionSharingAdapter
. Производит ли индивидуумreadCharacteristics
какие-либо предметы? ПоместитеdoOnNext
в разные места, чтобы увидеть, где исчезают события. - person akarnokd   schedule 07.12.2017RxAndroidBle
. Поскольку каждый раз, когда вызывается подписка, создается соединение с устройством, поэтому адаптер общего доступа в основном гарантирует, что только один из них активен в каждый момент времени. попробуюdoOnNext
- person David Carek   schedule 07.12.2017