onNext запустить еще один Observable

Есть ли более чистый способ сделать следующее?

У меня есть Android-наблюдатель, который отклоняет запросы. onNext вызывает вторую наблюдаемую.

{// when creating the android activity
searchTextEmitterSubject = PublishSubject.create();
subscription = AndroidObservable.bindActivity(this, Observable.switchOnNext(searchTextEmitterSubject))
                .debounce(100, TimeUnit.MILLISECONDS, Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d("a",s);
                        pa.doSearch(s);
                    }
                });
}

@OnTextChanged(R.id.ed)
public void onTextEntered(CharSequence charsEntered) {
    searchTextEmitterSubject.onNext(getASearchObservableFor(charsEntered.toString()));
}


private Observable<String> getASearchObservableFor(final String searchText) {
    return Observable.create( (Subscriber<? super String> subscriber) ->
            subscriber.onNext(searchText)).subscribeOn(Schedulers.io());
}

doSearch создает второй наблюдаемый объект:

public void doSearch(String string) {
    AlbumEndpoint albumEndpoint = API.getRestAdapter().create(AlbumEndpoint.class);
    Observable<Point> observable = albumEndpoint.searchPoint(string);
    mAdapterDataSubscription = AndroidObservable.bindActivity((Activity) getContext(), observable)
            .subscribe(mAdapterDataObserver);

}

private Observer<Point> mAdapterDataObserver = new Observer<Point>() {

    @Override
    public void onCompleted() {
        mAdapterDataSubscription.unsubscribe();
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(Point point) {
        List<Point.Result> results = point.getResults();
        mData.addAll(results);
        notifyDataSetChanged();
    }

};

Оно работает. Но есть ли способ «слить» два потока в один либо для улучшения читаемости, либо для создания наиболее оптимального кода?


Изменить: для полноты картины и для всех, кто интересуется будущим, мне удалось сократить код до:

Observable<EditText> searchTextObservable = ViewObservable.text(ed);
searchTextObservable.debounce(100, TimeUnit.MILLISECONDS)
        .flatMap(editText -> {
            String string = editText.getText().toString();
            AlbumEndpoint albumEndpoint = getRestAdapter().create(AlbumEndpoint.class);
            return albumEndpoint.searchPoint(string);
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(mAdapterDataObserver);

person Diolor    schedule 14.10.2014    source источник
comment
Мудрость: вы привели пример с ошибкой в ​​использовании Retrofit: сервис API, созданный RestAdapter, и сам адаптер должен быть Singletone, поскольку Джейк говорит. Таким образом, создание нового экземпляра при каждом отказе — пустая трата времени.   -  person Dmitry Gryazin    schedule 03.02.2015


Ответы (1)


Несколько вещей:

Я не уверен, почему вы каждый раз переносите события изменения текста в новый Observable вместо того, чтобы сразу передавать их своему субъекту:

Итак, вместо:

@OnTextChanged(R.id.ed)
public void onTextEntered(CharSequence charsEntered) {
    searchTextEmitterSubject.onNext(getASearchObservableFor(charsEntered.toString()));
}

попробуй это:

@OnTextChanged(R.id.ed)
public void onTextEntered(CharSequence charsEntered) {
    searchTextSubject.onNext(charsEntered.toString());
}

Конечно, тогда у вас будет PublishSubject<String> вместо PublishSubject<Observable<String>>.

Но вы можете пропустить onNextSwitch и просто debounce свою тему и продолжить оттуда.

Чтобы еще больше упростить ситуацию, вы можете просто использовать ViewObservables из пакета Android rxjava. Я не использовал их в последнее время, но это должно работать так:

Observable<OnTextChangeEvent> searchTextObservable = ViewObservable.text(tvSearch);

Затем OnTextChangeEvent будет выдаваться каждый раз, когда вызывается afterTextChanged базового TextWatcher (в ButterKnife по умолчанию используется onTextChanged, так что это может быть тонкой разницей).

В случае, если я упустил какую-то важную причину для переноса каждого изменения текста в Observable, это также может быть достигнуто более простым способом:

@OnTextChanged(R.id.ed)
public void onTextEntered(CharSequence charsEntered) {
    searchTextEmitterSubject.onNext(
        Observable.just(charsEntered.toString())
    );
}

Обратите внимание, что я также опустил subscribeOn(Schedulers.io()) — я никогда не чувствовал необходимости перемещать прослушиватели событий из потока пользовательского интерфейса в какой-либо другой поток. Я что-то упустил здесь?

Наконец, вы можете просто использовать flatMap для фактического выполнения поиска по каждому поисковому запросу (здесь я использую свой собственный Observable<OnTextChangeEvent>):

searchTextObservable
.debounce(100, TimeUnit.MILLISECONDS)
.flatMap(new Func1<OnTextChangeEvent, Observable<Point>>() {

    @Override
    public Observable<Point> call(OnTextChangeEvent event) {
        final String string = event.text.toString();
        AlbumEndpoint albumEndpoint = API.getRestAdapter().create(AlbumEndpoint.class);
        Observable<Point> pointObservable = albumEndpoint.searchPoint(string);
        return AndroidObservable.bindActivity((Activity) getContext(), pointObservable);
    }

})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mAdapterDataObserver);

Возможно, вам придется переместить ваш pointObservable в поток ввода-вывода, если альбомКонечная точка еще не сделала этого.

Надеюсь, это поможет!

EDIT: Извините, я забыл объяснить, что такое OnTextChangeEvent. Это часть rx-android — вы можете посмотреть исходный код здесь:

https://github.com/ReactiveX/RxAndroid/blob/0.x/src/main/java/rx/android/events/OnTextChangeEvent.java

Это просто POJO, который содержит ссылку на измененный EditText, а также текущее содержимое этого EditText. Последнее — это то, что я использовал в файле flatMap.

person david.mihola    schedule 15.10.2014
comment
Дэвид, твой ответ очень полезен. Спасибо. У меня все еще есть некоторые проблемы с пониманием OnTextChangeEvent, которое, поскольку tvSearch является <T extends TextView>, означает, что в нашем случае OnTextChangeEvent, вероятно, является расширенным EditText. Итак, что я сделал (не сработало): pastebin.com/cdgdJY6k [редактировать: только что понял, что searchTextSubject не ограничен до searchTextObservable, я стараюсь больше] - person Diolor; 16.10.2014
comment
Спасибо, Дэвид. Мы делали почти одно и то же. - person Diolor; 16.10.2014