Преобразовать/обернуть асинхронный слушатель в Observable (RxJava2)

Я хочу обернуть реального слушателя в объект Observable. Для начала вот тестовый кейс, с ним все нормально.

@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
    getObservablePhoneState()
        // Run on a background thread
        .subscribeOn(Schedulers.io())
        // Be notified on the main thread
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer -> Log.i(TAG, "----- subscribe onNext = " + integer));
}

private Flowable<Integer> getObservablePhoneState() {
    return Flowable.create(emitter -> {

        Log.i(TAG, "Emitting 1");
        emitter.onNext(1);

        Log.i(TAG, "Emitting 2");
        emitter.onNext(2);

    }, BackpressureStrategy.BUFFER);
}



*** logcat ***
Emitting 1
Emitting 2
----- subscribe onNext = 1
----- subscribe onNext = 2

Этот код выдает ошибку:

private Flowable<Integer> getObservablePhoneState() {
    return Flowable.create(emitter -> {

        PhoneStateListener phoneStateListener = new PhoneStateListener() {
            @Override
            public void onCallStateChanged(int state, String incomingNumber) {
                Log.i(TAG, "onCallStateChanged = " + state);
                emitter.onNext(state);
            }
        };
        TelephonyManager telephonyManager = (TelephonyManager) getActivity().getSystemService(Context.TELEPHONY_SERVICE);
        telephonyManager.listen(phoneStateListener, PhoneStateListener.LISTEN_CALL_STATE);

    }, BackpressureStrategy.BUFFER);
}

*** logcat ***
io.reactivex.exceptions.OnErrorNotImplementedException: 
Attempt to read from field 'android.os.MessageQueue 
android.os.Looper.mQueue' on a null object reference

С Observable.create() та же ошибка. Возможно, это связано с тем, что RxJava2 не поддерживает выдачу нулевого значения. .

Как это сделать правильно?


person tim4dev    schedule 20.09.2017    source источник
comment
Возможно ли, что вы вызываете вещь перед onCreate() в своей деятельности? Я попробовал ваш код, и он отлично сработал для меня.   -  person James McCracken    schedule 21.09.2017


Ответы (1)


Вы должны удалить subscribeOn(Schedulers.io()), чтобы избежать создания PhoneStateListener в другом потоке, потому что под капотом пытается отправлять сообщения с помощью обработчика, mQueue которого имеет значение null. Просто позвони

getObservablePhoneState()
     .subscribe { integer -> Log.i("", "----- subscribe onNext = " + integer) }
person gyosida    schedule 21.09.2017
comment
Спасибо, ваш комментарий был полезен. код сработал. но... onNext встречается всего 1--3 раза, а дальше - ничего. Тестирую под Android 8, использую последние версии библиотек. Я опубликую полный код для тестов позже. Возможно, я выбрал слишком сложный Listener. - person tim4dev; 21.09.2017
comment
onNext вызывается 1-3 раза, но ваш слушатель вызывался больше раз? - person gyosida; 21.09.2017
comment
Listener мертв и (конечно) дальше Next. Андроид 4.2 - все ОК. Android 8 - :( Собственный прослушиватель работает нормально. Полный код здесь github .com/tim4dev/dirty_code/tree/master/ - person tim4dev; 21.09.2017
comment
Родной phoneStateListener() работает нормально везде. На Android 4.2 Observable Listener (Observable, Flowable) работал нормально. На Android 8.0 Observable Listener (Observable, Flowable) вызывался 2-3 раза и не работал. Шаги для воспроизведения проблемы: - установите PHONE_NUMBER на свой телефон (да, вы будете звонить самому себе) - запустите приложение на Android 8 - позвоните - нажмите разъединение (сняв трубку) - если вы сделаете это достаточно быстро - все будет в порядке - если слушать короткие гудки и ждать, Listener умирает и больше не реагирует. - person tim4dev; 21.09.2017
comment
Я не уверен, но пробовали ли вы передать другое состояние методу listen? проверьте журнал изменений для 8.0, API мог измениться - person gyosida; 22.09.2017
comment
Обходной путь (удалить прослушиватель, добавить опрос) - person tim4dev; 22.09.2017