RxJava2: как избежать исключения InterruptibleException после удаления подписчика?

У меня есть наблюдаемое:

public Observable<List<Conversation>> getConversationListObservable() {
    return Observable.create(emitter -> {
        List<Conversation> conversations = networkApi.getConversations();
        for (Conversation conversation : conversations) {
            if (emitter.isDisposed()) return;  // this will cause subscription to terminate.
            List<User> users = networkApi.getUserList(conversation.getId());
            conversation.setUsers(users);
        }
        emitter.onNext(conversations);
        emitter.onComplete();
    });
}

который используется в классе androidx.lifecycle.ViewModel:

public class ConversationViewModel extends ViewModel {
    private CompositeDisposable disposables = new CompositeDisposable();
    ....
    public void fetchConversationList(){
        disposables.add(repository.getConversationListObservable()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(this::setConversations, this::onError));
    }
}

Когда я ухожу от экрана со списком Conversation, этот наблюдаемый объект удаляется, но у меня есть предупреждение в logcat от RxJavaPlugins.setErrorHandler, помещенного в мой Application класс:

W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException

по networkApi.getUserList звонку. Похоже, когда я делаю этот сетевой вызов, мой подписчик не удаляется в начале сетевого вызова и уже удален, когда я получаю ответ на вызов. Есть ли способ не получить InterruptedException в RxJavaPlugins.setErrorHandler, кроме как удалить этот плагин из класса Application?

P.S .: Трассировка стека выглядит так:

2019-11-24 23:28:00.152 18051-18343/com.example W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1036)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1327)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at com.example.client.Client.sendRequest(Client.scala:61)
    at com.example.ui.Repository.provideClient(Repository.java:205)
    at com.example.ui.Repository.fetchUser(Repository.java:981)
    at com.example.ui.Repository.fetchUserWithRole(Repository.java:987)
    at com.example.ui.Repository.access$2400(Repository.java:95)
    at com.example.ui.Repository$11.lambda$createCall$1$Repository$11(Repository.java:912)
    at com.example.ui.-$$Lambda$Repository$11$8ZJhKkqn7hg2E6f5A5NBX1EeUPY.subscribe(lambda)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)

Это трассировка стека из реального кода, но я упростил код для настоящего вопроса. Client.sendRequest в этой трассировке стека соответствует networkApi.getUserList в упрощенном примере.


person isabsent    schedule 24.11.2019    source источник
comment
Можете ли вы распечатать трассировку стека этого Undeliverable exception received?   -  person Pavel Poley    schedule 24.11.2019
comment
Вопрос был обновлен трассировкой стека.   -  person isabsent    schedule 24.11.2019
comment
Когда вы не переходите на другой экран, нет вообще никаких исключений?   -  person Pavel Poley    schedule 24.11.2019
comment
Никаких исключений (не исключений - только предупреждений). Если я жду достаточно времени, чтобы загрузить всех пользователей во все разговоры, у меня не будет предупреждений в трассировке стека после перехода на другой экран.   -  person isabsent    schedule 24.11.2019
comment
Итак, если подписчик не удален в начале сетевого вызова и уже удален при получении ответа на вызов, всегда будет выброшено исключение, и выбор - это только место, где я могу его поймать?   -  person isabsent    schedule 24.11.2019
comment
да, и в качестве поощрения из официальных документов: если библиотека / код уже сделали это, недоставленные исключения InterruptedExceptions должны прекратиться. Если этот шаблон ранее не использовался, мы рекомендуем обновить соответствующий код / ​​библиотеку. Также в комментарии к перехвату InterruptedExceptions в коде setErrorHandler: хорошо, некоторый блокирующий код был прерван вызовом dispose   -  person Gustavo    schedule 24.11.2019