Я пытаюсь реализовать шаблон обновления состояния избыточности с помощью RXJava.
val subject=PublishSubject.create()
val subject1=PublishSubject.create()
// multiple threads posting
// on subject and subject1 here. Concurrently
subject.mergeWith(subject1)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
Как видите, я использую оператор scan
для сохранения состояния.
Как я могу быть уверен, что обновления состояния происходят последовательно, даже если несколько потоков создают события?
Есть ли какой-то механизм в операторе scan
, который заставляет события стоять в некоторой очереди, ожидая завершения функции обновления текущего состояния?
Что я сделал:
Я успешно реализовал этот шаблон в среде Android. Это очень просто, потому что если вы всегда выполняете обновление состояния в
AndroidSchedulers.mainThread()
И сделайте объект состояния неизменяемым, и вы гарантированно получите атомарное и последовательное обновление состояния. Но что произойдет, если у вас нет специального планировщика для обновления состояния? Что делать, если вы не на Android?
Что я исследовал:
Я прочитал исходный код для оператора
scan
, и здесь нет ожидающей "очереди". Простое обновление состояния и эмиссияЯ также прочитал исходный код SerializedSubject. Действительно существует очередь ожидания, которая сериализует выбросы. Но что произойдет, если у меня будет два предмета? Сериализация их обоих не означает, что они не мешают друг другу.