RxJava: наблюдать, подписываться и делать, наконец, переключение между потоками ввода-вывода и пользовательского интерфейса.

Я столкнулся с проблемой, когда мой наблюдаемый объект подписывается на поток ввода-вывода и наблюдается в основном потоке (UI) Android, но оператор doFinally запускается в потоке ввода-вывода, и его нужно запускать в потоке пользовательского интерфейса.

Вариант использования почти такой же, как этот средняя статья.

По сути, я хочу показать ProgressBar, когда Observable подписан, и скрыть ProgressBar, когда Observable завершается или завершается.

Я получаю сообщение об ошибке: java.lang.IllegalStateException: Текущий поток должен иметь зацикливатель!

Может ли кто-нибудь помочь мне переместить действие doFinally обратно в поток пользовательского интерфейса, в котором есть цикл? Или я упускаю какую-то другую информацию?

ИЗМЕНИТЬ Рабочий процесс варианта использования:

-> Активность запуска

-> инициализировать

-> выполнить наблюдаемый поток

-> Начать новую деятельность и завершить текущую деятельность

-> Новая деятельность

-> Начать исходную деятельность и закончить

-> повторить инициализацию

Большое тебе спасибо.

Подробности:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android SDK мин. 14 и цель 25

Пример кода

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

Трассировки стека:

FATAL EXCEPTION: RxCachedThreadScheduler-1
  Process: com.example.android.demo.customerfirst.alpha, PID: 16685
  java.lang.IllegalStateException: The current thread must have a looper!
      at android.view.Choreographer$1.initialValue(Choreographer.java:96)
      at android.view.Choreographer$1.initialValue(Choreographer.java:91)
      at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
      at java.lang.ThreadLocal.get(ThreadLocal.java:65)
      at android.view.Choreographer.getInstance(Choreographer.java:192)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
      at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
      at android.animation.ValueAnimator.end(ValueAnimator.java:998)
      at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
      at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
      at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
      at android.view.View.dispatchVisibilityChanged(View.java:8643)
      at android.view.View.setFlags(View.java:9686)
      at android.view.View.setVisibility(View.java:6663)
      at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator$3.run(ProductListPresenterMediator.java:56)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
      at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
      at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
      at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
      at io.reactivex.Scheduler$1.run(Scheduler.java:138)
      at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
      at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
      at java.util.concurrent.FutureTask.run(FutureTask.java:237)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
      at java.lang.Thread.run(Thread.java:818)

person JTT    schedule 28.04.2017    source источник


Ответы (3)


Проблема возникла из-за того, что я не удалял подписку, когда действие было завершено/уничтожено.

Теперь каждое действие/представление сообщает ведущему, когда они остановлены или уничтожены, и ведущий избавляется от подписки.

Кажется, это решило мою проблему.

 @Override
public void initialize() {
    if (!isViewAttached()) {
        throw new ViewNotAttachedException();
    }
    disposable = listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
}

@Override
public void dispose() {
    if (disposable != null) {
        disposable.dispose();
    }
}
person JTT    schedule 28.04.2017

Все, что вам нужно сделать, это переместить observeOn вверх по цепочке. Метод observeOn изменяет поток, в котором вызываются onNext, onError и onCompleted, который внутренне определяет, как работают операции и побочные эффекты (через подъем)

listUseCase.execute(null)
            .subscribeOn(schedulerProvider.io()) // Move subscribe on here
            .observeOn(schedulerProvider.main()) // Change threads here
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true); // This should be on the main thread also
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })

            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );
person cyroxis    schedule 28.04.2017
comment
Проблема все еще возникает с изменением. Я добавлю больше информации в исходный билет, так как, возможно, я делаю что-то не так за пределами этого наблюдаемого варианта использования. - person JTT; 28.04.2017
comment
Посмотрите на трассировку стека ошибок (или опубликуйте ее), она должна сказать вам, где какая строка вызывает неправильный поток. - person cyroxis; 28.04.2017
comment
@cryoxis Я опубликовал трассировку стека. Я чувствую, что действие возвращается в утилизированном потоке, у которого нет петлителя. Я не выполняю никаких действий по удалению, поэтому я пойду и обновлю свои действия/представления и их докладчики, чтобы избавиться от наблюдаемых, когда они остановлены/приостановлены/уничтожены, и вернусь к вам. Спасибо за понимание подъемного механизма. Я отвечу, как дела. - person JTT; 28.04.2017
comment
@JTT Ты не ответил. :) - person milosmns; 13.03.2018
comment
@milosmns Я опубликовал свой собственный ответ о моей проблеме и ее решении. Разве это не правильно? - person JTT; 15.03.2018
comment
@JTT Я не видел, чтобы ответ был и твоим, ой. Хорошо, спасибо, разбираюсь. - person milosmns; 20.03.2018

Может быть, уже слишком поздно. Но проверьте документацию здесь про doFinally говорят явно"

Вызывает указанное действие после того, как этот Observable сигнализирует onError или onCompleted или удаляется нисходящим потоком.

Это означает, что doFinally он будет вызван в любом случае, и не гарантируется, что у вас есть допустимый контекст.

Я не использую doFinally для этого. Я всегда устанавливаю загрузку (false) onNext или onError. Не красиво, но эффектно.

person egonzal    schedule 07.01.2019
comment
Да. Нет никакой гарантии, что doFinally будет вызываться в потоке, указанном в observeOn. Я лично предпочитаю для таких случаев doAfterTerminate, который описан в документах, чтобы он вызывался после того, как этот Single вызывает либо onSuccess, либо onError, что идеально подходит для меня при работе с LiveData. - person ivan8m8; 04.09.2019