Schedulers.io создает сотни RxCachedThreadSchedulers

Я использую RxJava в своем приложении для Android, и он несколько раз сталкивался с OutOfMemoryError. Я проверил это с помощью диспетчера устройств и только что заметил, что у меня более 200 потоков, большинство из которых находятся в состоянии ожидания, и обычно это RxCachedThreadSchedulers. Ошибка OOMError возникает из-за слишком большого количества потоков. Я также заметил, что если я нажимаю кнопку, которая вызывает службу, получаю токен и кэширую его, количество потоков увеличивается на 5!

Итак, я погуглил и обнаружил, что Schedulers.io может создавать неограниченное количество потоков. Когда я заменяю каждый Schedulers.io на Schedulers.computation, проблема исчезает, но это не имеет смысла, поскольку я использую Schedulers.io так, как предполагается.

Итак, как я могу использовать Schedulers.io и убедиться, что он не создает слишком много потоков?

Обновить

Я отписываюсь так:

    final Scheduler.Worker worker = Schedulers.io().createWorker();
    worker.schedule(new Action0() {
        @Override
        public void call() {
            long last = lastServerCommunication.getMillis();
            LongPreference pref = new LongPreference(mSharedPreferences, PREF_KEY_LAST_SERVER_COMMUNICATION);
            pref.set(last);
            worker.unsubscribe();
        }
    });

Обновление №2

Я обычно использую Schedulers.io, например:

public Observable<Scenario> load() {
    return Observable
            .create(new Observable.OnSubscribe<Scenario>() {
                @Override
                public void call(Subscriber<? super Scenario> subscriber) {
                    try {
                        Scenario scenario = mGson.fromJson(mSharedPreferences.getString("SCENARIO", null), Scenario.class);
                        subscriber.onNext(scenario);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(new Throwable());
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

И:

    mSomeSubscription = mSomeManager.readFromDatabase()
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<List<SomeEntry>>() {
                @Override
                public void onCompleted() { }

                @Override
                public void onError(Throwable e) {
                    // some logging
                }

                @Override
                public void onNext(List<SomeEntry> Entries) {
                    // Some action
                }
            });

person szidijani    schedule 08.07.2015    source источник
comment
Эти варианты использования не подвержены утечкам. Вы можете проверить, используете ли вы последнюю версию RxJava, потому что зависимость RxAndroid имеет тенденцию отставать: вам нужно вручную переопределить ее, если это необходимо.   -  person akarnokd    schedule 09.07.2015


Ответы (3)


Хорошо, я нашел причину. См. Описание, Обновление №2:

return Observable.create(new Observable.OnSubscribe<Something>() {
            @Override
            public void call(Subscriber<? super Something> subscriber) {
                try {
                  // Some action
                    subscriber.onNext(scenario);
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(new Throwable());
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

Когда вы создаете такие холодные наблюдаемые последовательности, вы должны убедиться, что вызываете onCompleted для подписчиков, см. Выше subscriber.onCompleted();. Что ж, его не было в некоторых местах кода, поэтому были сгенерированы потоки io.

Большое спасибо акарнокд за помощь!

person szidijani    schedule 09.07.2015

Если вы используете Schedulers.io().createWorker(), вы должны unsubscribe() Worker после того, как закончите. Обычные операторы RxJava не должны пропускать рабочие процессы и, следовательно, потоки.

person akarnokd    schedule 08.07.2015
comment
Я использую отмену подписки, когда создаю воркеры, но потоки по-прежнему генерируются и не освобождаются. Вот почему я думаю, что нормальный .subscribeOn(Schedulers.io()) будет пропускать потоки. - person szidijani; 09.07.2015
comment
Какова ваша цепочка операторов и код, по которому вы отказываетесь от подписки? - person akarnokd; 09.07.2015
comment
Обновлено описание - person szidijani; 09.07.2015
comment
Я предполагаю, что действие вылетает с некоторым исключением RuntimeException перед вызовом отмены подписки. Добавьте try-catch-finally, чтобы увидеть, произойдет ли это. - person akarnokd; 09.07.2015
comment
К сожалению, этого не происходит. Утечка от штатных операторов. Но я не знаю, что с ними делать, потому что это холодные наблюдаемые объекты, где я использую это, и он выполняет onCompleted, который автоматически отписывается, верно? - person szidijani; 09.07.2015
comment
Что это за операторы? Не могли бы вы создать модульный тест, демонстрирующий утечки. Вы используете unsafeSubscribe() где-нибудь в своем коде? - person akarnokd; 09.07.2015
comment
Я нигде не использую unsafeSubscribe(), так что проблема не в этом. - person szidijani; 09.07.2015
comment
Обновлено описание для регулярного использования. - person szidijani; 09.07.2015

Замечательно, если вы не вызываете метод onComplete, тогда поток (созданный Schedulers.io()) будет ждать и существует для тех холодных наблюдаемых.

person aolphn    schedule 15.07.2016