Как объединить Observable‹OnTextChangeEvent› с Observable‹OnCheckedChangeEvent›?

Я изучаю RxJava с использованием библиотеки RxAndroid, используя Retrofit для работы в сети и RetroLambda для использования лямбда-выражений Java8.

Приложение, которое я хочу создать, имеет следующие функции:

  • Разрешить пользователю вводить запрос к API Википедии
  • Подождите 1 секунду после ввода, пока не будет выполнен сетевой вызов.
  • Показывать индикатор выполнения, когда приложение «занято» получением результата
  • Индикатор прогресса запускается сразу после ввода, а не через 1 секунду, и заканчивается при получении результата или ошибки

У меня работает так:

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);
Observable<OnTextChangeEvent> debouncedStream = textStream.debounce(1, TimeUnit.SECONDS); // Unchecked assignment

// start activity indicator immediately
textStream
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> mProgressBar.setVisibility(View.VISIBLE));

debouncedStream
        .map(t -> wikiService.search(t.text().toString())) // query wikipedia
        .map(Object::toString)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            mTextView.setText(s == null ? "Error" : s);
            mProgressBar.setVisibility(View.GONE);
        });

Теперь я хочу добавить новый виджет, чтобы мой запрос в Википедии мог использовать другой язык. На данный момент я остановлюсь на коммутаторе, либо «en», либо «nl» в качестве префикса для URL-адреса википедии.

GUI перед взаимодействием с ним пользователя

Итак, я делаю новый Observable из коммутатора, он испускает OnCheckedChangeEvents.

Я думаю, мне нужно объединить этот Observable с textStream.

Когда переключатель щелкнут, в основном должны работать те же функции, но не совсем. Текущий запущенный запрос (если он вообще выполняется) устаревает, поскольку префикс URL-адреса изменится. Он должен подождать еще 1 секунду, а затем начать новый сетевой вызов.

Очевидно, что следующее не работает:

// emit when Switch is flipped
Observable<OnCheckedChangeEvent> languageSwitchStream = WidgetObservable.input(mLanguageSwitch);

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);

// combine these 2, but they are using different types
Observable uiChangeStream = Observable.merge(textStream, languageSwitchStream);

Observable<OnTextChangeEvent> debouncedStream = uiChangeStream.debounce(1, TimeUnit.SECONDS); // Unchecked assignment

Я не могу просто объединить textStream и languageSwitchStream.

Таким образом, возникает вопрос: как мне подойти к этому, используя правильный Rx?

== РЕШЕНИЕ ===========================

// emit when Switch is flipped
Observable<OnCheckedChangeEvent> languageSwitchStream =
        WidgetObservable
                .input(mLanguageSwitch)
                .startWith(new OnCheckedChangeEvent() {
                    @Override
                    public CompoundButton view() {
                        return null;
                    }

                    @Override
                    public boolean value() {
                        return mLanguageSwitch.isChecked();
                    }
                });

// emit when text is changed
Observable<OnTextChangeEvent> textStream = WidgetObservable.text(mEditText);

Observable<OnTextChangeEvent> uiChangeStream = Observable
        .combineLatest(
                textStream,
                languageSwitchStream,
                (text, switchValue) -> text);

uiChangeStream
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> mProgressBar.setVisibility(View.VISIBLE));

uiChangeStream
        .debounce(1, TimeUnit.SECONDS)
        .map(t -> wikiService.search(t.text().toString())) // query wikipedia
        .map(Object::toString)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            mTextView.setText(s == null ? "Error" : s);
            mProgressBar.setVisibility(View.GONE);
        });

Расширяя утвержденный ответ, я добавил .startsWith() к наблюдаемому коммутатору, иначе он будет ждать, пока он будет перевернут, прежде чем выдавать значение.


person xorgate    schedule 13.04.2015    source источник
comment
Должен ли он использовать RX? Потому что похоже, что вам просто нужен CheckListener для замены wikiService.   -  person inmyth    schedule 13.04.2015
comment
Почему именно вы ждете 1 секунду, прежде чем сделать сетевой запрос. Как вы пришли к этому ограничению?   -  person IgorGanapolsky    schedule 17.03.2016


Ответы (1)


Используйте CombineLatest и сделайте что-то вроде этого:

Observable<SearchParams> uiChangeStream = Observable.combineLatest(
    textStream, 
    languageSwitchStream,
    (text, switch) -> /* extract info from each and return search params */)
.map(searchParams -> wikiService.search(searchParams);

combineLatest() будет повторно выдавать последнее значение из другого Observable, когда один Observable выдает новое значение.

person Ross Hambrick    schedule 13.04.2015
comment
Может ли здесь быть полезен оператор zip? - person IgorGanapolsky; 17.03.2016