Оператор share() не работает для Observable в Rxjava

У меня есть сценарий, в котором у нас есть эмиттер, который постоянно выдает такие данные.

fun subscribeForEvents(): Flowable<InkChannel> {
    return Flowable.create<InkChannel>({
        if (inkDevice.availableDeviceServices.contains(DeviceServiceType.EVENT_DEVICE_SERVICE)) {
            (inkDevice.getDeviceService(DeviceServiceType.EVENT_DEVICE_SERVICE) as EventDeviceService).subscribe(object : EventCallback {
                override fun onUserActionExpected(p0: UserAction?) {
                    it.onNext(InkChannel.UserActionEvent(p0))
                }

                override fun onEvent(p0: InkDeviceEvent?, p1: Any?) {
                    it.onNext(InkChannel.InkEvents<Any>(p0, p1))
                }

                override fun onUserActionCompleted(p0: UserAction?, p1: Boolean) {

                }
            }

            )
        }
    }, BackpressureStrategy.BUFFER).share()

}

теперь у меня есть сервис, который я запускаю при запуске приложения и слушаю его

  inkDeviceBus.subscribeForEvents()
                .filter { it -> (it as InkChannel.InkEvents<*>).event == InkDeviceEvent.STATUS_CHANGED }
                .map { it -> it as InkChannel.InkEvents<*> }
                .map { it -> it.value.toString() }
                .filter { value -> value == "CONNECTED" || value == "DISCONNECTED" }
                .map { it -> it == "CONNECTED" }
                .subscribeBy { b ->
                    if (b) stopSelf()
                }

У меня есть другая активность MainActivity, которая вызывается при запуске, где я наблюдаю то же событие. Теперь проблема в том, что только слушатель в службе получает события, а активность не получает никаких событий.

Теперь, когда я удаляю прослушиватель из службы, активность начинает получать события. Я использовал оператор share для обмена наблюдаемым, но, похоже, он не работает.


person Rajat Beck    schedule 24.11.2018    source источник


Ответы (1)


.share() влияет только на экземпляр, на котором он вызывается. Поскольку каждый вызов subscribeForEvents создает новый экземпляр, .share() не меняет поведение.

Вам нужно вызвать subscribeForEvents один раз, а затем использовать возвращенное значение, когда вы хотите получать события. Пока используется один и тот же объект, он будет использовать общий прослушиватель.

person Kiskae    schedule 24.11.2018