Объединение двух разных наблюдаемых

Имейте следующий фрагмент:

 Log.d("#######", Thread.currentThread().getName());
    RxSearchView.queryTextChangeEvents(searchView)
            .debounce(400, TimeUnit.MILLISECONDS,Schedulers.newThread())
            .flatMap(new Func1<SearchViewQueryTextEvent, Observable<GifsData>>() {
                @Override
                public Observable<GifsData> call(SearchViewQueryTextEvent txtChangeEvt) {
                    return RestWebClient.get().getSearchedGifs(txtChangeEvt.queryText().toString(),"dcJmzC");
                }
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<GifsData>() {
                @Override
                public void onCompleted() {
                    Log.d("#######","onCompleted searchGifs");
                }

                @Override
                public void onError(Throwable e) {
                    Log.d("#######",e.toString());
                }

                @Override
                public void onNext(GifsData gifsData) {
                   mainFragmentPresenterInterface.displaySearchedGifsList(gifsData);
                }
            });
}

Независимо от того, что я пытаюсь, я продолжаю получать следующую ошибку:

java.lang.IllegalStateException: Must be called from the main thread. Was: Thread[RxNewThreadScheduler-2,5,main]

Вероятно, потратил на это около часа ... Не смог понять, в чем проблема. Даже пытался сопоставить мой фрагмент со следующей ссылкой:

Объединение наблюдаемого RxTextView и модифицированного наблюдаемого

Неудачно. Может ли кто-нибудь указать, что здесь не так?

Спасибо.


person user2511882    schedule 28.06.2016    source источник
comment
Удалите эту строку .subscribeOn(Schedulers.newThread()), она работает правильно. Причина в том, что вы получаете доступ к представлениям в фоновом потоке. Или вы можете получить текст поиска за пределами Observable, чтобы представление не было доступно в фоновом потоке.   -  person Sagar Trehan    schedule 28.06.2016
comment
но я наблюдаю за этим в основном потоке, где манипулируют представлениями, не так ли?   -  person user2511882    schedule 28.06.2016
comment
Я не вижу исключения... но onNext тоже не запускается   -  person user2511882    schedule 28.06.2016
comment
Вы наблюдаете результат в основном потоке, но подписываетесь на фоновый поток, поэтому вы получаете это исключение.   -  person Sagar Trehan    schedule 28.06.2016
comment
Означает ли это, что все сетевые вызовы в приведенном выше фрагменте НЕ выполняются в фоновом потоке, если я избавлюсь от subscribeOn   -  person user2511882    schedule 28.06.2016
comment
Пожалуйста, ознакомьтесь с этим примером github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/ это поможет вам исправить это   -  person Sagar Trehan    schedule 28.06.2016
comment
Я уже пробовал этот пример. Это работает, только если я регистрируюсь или показываю тост. Но не при попытке обновить список.   -  person user2511882    schedule 28.06.2016
comment
Если вы не укажете .subscribeOn(Schedulers.newThread()), то ваш наблюдаемый будет выполняться в потоке по умолчанию, который в вашем случае, я думаю, является основным потоком   -  person Sagar Trehan    schedule 28.06.2016
comment
Что не рекомендуется. Следовательно, у меня была подписка на место там   -  person user2511882    schedule 28.06.2016


Ответы (2)


Причина ошибки: вы подписываетесь на результат в фоновом потоке и получаете доступ к просмотру в потоке в фоновом потоке. Здесь я вызвал фоновый планировщик RestWebClient.get().getSearchedGifs(txtChangeEvt.queryText().toString(),"dcJmzC").subscribeOn(Schedulers.newThread());on. Пожалуйста, попробуйте это, это сработает для вас:

RxSearchView.queryTextChangeEvents(mSearchView)
            .debounce(400, TimeUnit.MILLISECONDS)
            .flatMap(new Func1<SearchViewQueryTextEvent, Observable<String>>() {
                @Override
                public Observable<String> call(SearchViewQueryTextEvent txtChangeEvt) {
                    return Observable.just(txtChangeEvt.queryText().toString()).subscribeOn(AndroidSchedulers.mainThread());
                }
            })
            .flatMap(new Func1<GifsData, Observable<String>>() {
                @Override
                public Observable<GifsData> call(String txtChangeEvt) {
                    return RestWebClient.get().getSearchedGifs(txtChangeEvt,"dcJmzC").subscribeOn(Schedulers.newThread());
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<GifsData>() {
                @Override
                public void onCompleted() {
                    Log.d("#######","onCompleted searchGifs");
                }

                @Override
                public void onError(Throwable e) {
                    Log.d("#######",e.toString());
                }

                @Override
                public void onNext(GifsData gifsData) {
                    Log.d("#######", gifsData);
                }
            });

Дайте мне знать, если это поможет

person Sagar Trehan    schedule 28.06.2016
comment
У вас есть две подписки на здесь. Разве это не вызовет проблемы. Насколько я понимаю, к потоку данных применяется только первый subscribeOn. В этом случае вызов getSearchedGifs будет выполняться в mainThread. Разве это не так? - person user2511882; 28.06.2016
comment
Вы можете подписаться на стримы на разных подписчиков. Совершенно правильно иметь разных подписчиков на потоки на разных подписчиках. Надеюсь, что кто-то еще прыгнет в это, чтобы ответить на ваш комментарий - person Sagar Trehan; 28.06.2016
comment
Я нашел следующую ссылку о том, почему несколько subscribeOn работает с плоскими наблюдаемыми. Однако объяснение все еще запутанно для меня. Я поддержу ответ, поскольку кажется, что он в правильном направлении, но буду держать его открытым до тех пор, пока кто-нибудь не сможет объяснить, почему subscribeOn работает с плоским отображением. Ссылка: groups.google.com/forum/#!topic/rxjava/XXJJPhn8PHQ. - person user2511882; 28.06.2016

Оператор debounce по умолчанию использует планировщик computation, вам нужно изменить его на основной поток (потому что вы работаете с UI только на основном).

Следующее, что нужно сделать, это запланировать выполнение сетевого запроса на планировщике io. (сейчас мы используем только один subscribeOn).

И снова наблюдение за результатами в основном потоке для взаимодействия с пользовательским интерфейсом.

RxSearchView.queryTextChangeEvents(searchView)
  .debounce(400, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
  .flatMap(new Func1<SearchViewQueryTextEvent, Observable<GifsData>>() {
    @Override
    public Observable<GifsData> call(SearchViewQueryTextEvent txtChangeEvt) {
      return RestWebClient.get()
        .getSearchedGifs(txtChangeEvt.queryText().toString(),"dcJmzC")
        .subscribeOn(Schedulers.io());
    }
  })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<GifsData>() {
    @Override
    public void onCompleted() {
      Log.d("#######","onCompleted searchGifs");
    }

    @Override
    public void onError(Throwable e) {
      Log.d("#######",e.toString());
    }

    @Override
    public void onNext(GifsData gifsData) {
      mainFragmentPresenterInterface.displaySearchedGifsList(gifsData);
    }
});
person marwinXXII    schedule 28.06.2016