Обеспечьте последовательное обновление состояния при использовании оператора сканирования RXJava.

Я пытаюсь реализовать шаблон обновления состояния избыточности с помощью RXJava.

val subject=PublishSubject.create()
val subject1=PublishSubject.create()

// multiple threads posting 
// on subject and subject1 here. Concurrently 


subject.mergeWith(subject1)
       .scan(
             getInitState(),
             {state, event ->
               // state update here 
             }
         )
        .subscribe({state ->
          // use state here
        })

Как видите, я использую оператор scan для сохранения состояния.

Как я могу быть уверен, что обновления состояния происходят последовательно, даже если несколько потоков создают события?

Есть ли какой-то механизм в операторе scan, который заставляет события стоять в некоторой очереди, ожидая завершения функции обновления текущего состояния?

Что я сделал:

Я успешно реализовал этот шаблон в среде Android. Это очень просто, потому что если вы всегда выполняете обновление состояния в

AndroidSchedulers.mainThread()

И сделайте объект состояния неизменяемым, и вы гарантированно получите атомарное и последовательное обновление состояния. Но что произойдет, если у вас нет специального планировщика для обновления состояния? Что делать, если вы не на Android?

Что я исследовал:

  • Я прочитал исходный код для оператора scan, и здесь нет ожидающей "очереди". Простое обновление состояния и эмиссия

  • Я также прочитал исходный код SerializedSubject. Действительно существует очередь ожидания, которая сериализует выбросы. Но что произойдет, если у меня будет два предмета? Сериализация их обоих не означает, что они не мешают друг другу.


person Rezo Shalikashvili    schedule 23.02.2020    source источник


Ответы (1)


Чтобы принудительно выполнить выполнение в одном потоке, вы можете явно создать планировщик одного потока для замены AndroidSchedulers.mainThread():

val singleThreadScheduler = Schedulers.single()

Даже если события генерируются в других потоках, вы можете убедиться, что обрабатываете их только в своем единственном потоке, используя observeOn:

subject.mergeWith(subject1)
   .observeOn(singleThreadScheduler)
   .scan(
         getInitState(),
         {state, event ->
           // state update here 
         }
     )
    .subscribe({state ->
      // use state here
    })  

Разница между observeOn и subscribeOn может быть довольно запутанной, и регистрация идентификатора потока может быть полезна для проверки того, что все выполняется в ожидаемом потоке.

http://reactivex.io/documentation/scheduler.html

person tonicsoft    schedule 23.02.2020
comment
Спасибо за ответ. Он точно рабочий. Но что бы я делал без планировщика? Например, если я попытаюсь использовать тот же подход в JavaScript? - person Rezo Shalikashvili; 24.02.2020
comment
не знаком с rx-js, но я бы предположил, что он все равно однопоточный. Конечно, я бы ожидал, что любая js-библиотека по умолчанию будет однопоточной. - person tonicsoft; 24.02.2020