RxJava (Android) — есть ли вред в сохранении планировщика?

У меня есть синглтон, который содержит ссылку на мой объект базы данных. Что я хотел бы сделать, так это ограничить любую операцию базы данных одним потоком ввода-вывода.

Сначала я попробовал следующее:

class SQLSingleton{

 ...

 public Observable<MyObject> query(final String id){

   return Observable.fromCallable(() -> {

       //database operations here
   })
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread());
 }

 ...
}

Проблема с этим подходом заключается в том, что Schedulers.io() может предоставить другой поток из своего пула для каждого подписчика, который подписывается на мой наблюдаемый выше (и я этого не хочу, я хочу, чтобы код выполнялся в том же потоке во все времена).

Затем я перешел к подходу к самостоятельному планировщику:

class SQLSingleton{

 private final Scheduler schedulerIODatabase;

 public SQLSingleton(){
      schedulerIODatabase = Schedulers.newThread();
 }

 public Observable<MyObject> query(final String id){

   return Observable.fromCallable(() -> {

       //database operations here
   })
   .subscribeOn(schedulerIODatabase)
   .observeOn(AndroidSchedulers.mainThread());
 }

 ...
}

Поскольку я новичок в RxJava (и, похоже, в нем много подводных камней), я спрашиваю: Есть ли какой-либо вред в сохранении этого объекта планировщика (имейте в виду, что SQLSingleton является синглтоном, поэтому что Планировщик и будет)?


person Bitcoin Cash - ADA enthusiast    schedule 28.12.2016    source источник


Ответы (1)


schedulerIODatabase = Schedulers.newThread();

Это не имеет никакого эффекта, потому что newThread выдает новый поток каждый раз, когда он применяется с subscribeOn, аналогично io(), но без повторного использования потока.

Вы не можете закрепить и повторно использовать определенный поток RxJava в RxJava 1.x, но вы можете сделать это в RxJava 2 с его компонентом библиотеки расширений: Общий планировщик.

В RxJava 1 вы должны предоставить свой собственный однопоточный ExecutorService для Schedulers.from, который затем будет использовать этот единственный Executor для всех вызовов рабочих процессов. Обратите внимание, что вам нужно вручную управлять жизненным циклом вашего ExecutorService и выключать его, когда ваше приложение должно завершить работу:

ExecutorService exec = Executors.newSingleThreadedExecutor();

Scheduler singleScheduler = Schedulers.from(exec);

Observable.fromCallable(() -> {
   //database operations here
})
.subscribeOn(singleScheduler)
.observeOn(AndroidSchedulers.mainThread());

// ...

exec.shutdown();
person akarnokd    schedule 28.12.2016
comment
Спасибо за отличный ответ @akarnokd. Не могли бы вы уточнить, как именно мне вручную управлять жизненным циклом вашего ExecutorService и отключать его, когда ваше приложение должно завершиться? Должен ли я сделать это в методе onDestroy моего объекта приложения? У меня сложилось впечатление, что мне не нужно об этом заботиться, поскольку эти объекты являются одиночками и, следовательно, будут правильно собраны, если приложение закроется. - person Bitcoin Cash - ADA enthusiast; 30.12.2016
comment
Я не знаю, отключает ли Android ExecutorServices автоматически или нет, этого точно не происходит в настольных Java-приложениях. Пара onCreate/onDestroy звучит разумно. - person akarnokd; 30.12.2016