Рассмотрите возможность использования оператора zip для объединения двух бесконечных Observable, один из которых испускает элементы в два раза чаще, чем другие.
Текущая реализация без потерь, то есть, если я продолжу излучать эти Observable в течение часа, а затем переключаюсь между их скоростью излучения, первый Observable в конечном итоге догонит другого.
Это вызовет взрыв памяти в какой-то момент по мере того, как буфер становится все больше и больше.
То же самое произойдет, если первый наблюдаемый будет генерировать элементы в течение нескольких часов, а второй - один элемент в конце.
Как добиться для этого оператора поведения с потерями? Я просто хочу излучать в любое время, когда получаю выбросы из обоих потоков, и меня не волнует, сколько выбросов из более быстрого потока я пропускаю.
Уточнения:
- Основная проблема, которую я пытаюсь решить здесь, - это взрыв памяти из-за без потерь работы оператора
zip
. - Я хочу излучать в любое время, когда получаю выбросы из обоих потоков, даже если оба потока каждый раз генерируют одно и то же значение
Пример:
Stream1: 1 2 3 4 5 6 7
Stream2: 10 20 30 40 50 60 70
Обычный zip
даст следующий результат:
[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;
const s1 = new Subject();
const s2 = new Subject();
Observable.zip(s1,s2).subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
Результат, который я хотел бы получить:
[1, 10]
[3, 20]
[5, 30]
Объяснение:
Оператор zip
с потерями - это zip
с размером буфера 1
. Это означает, что он сохранит только первый элемент из потока, который был сгенерирован первым, и потеряет все остальные (элементы, которые прибывают между первым элементом и первым выпуском из второго потока). В этом примере происходит следующее: stream1
излучает 1
, zip с потерями "запоминает" его и игнорирует все элементы на stream1
до тех пор, пока stream2
не излучает. Первая эмиссия stream2
равна 10
, поэтому stream1
проигрывает 2
. После взаимной эмиссии (первая эмиссия с потерями zip
) она начинается заново: «запомнить» 3
, «потерять» 4
, испустить [3,20]
. Затем начните сначала: «запомнить» 5
, «потерять» 6
и 7
, испустить [5,30]
. Затем начните сначала: «запомните» 40
, «потеряйте» _28 _, _ 29 _, _ 30_ и ждите следующего пункта stream1
.
Пример 2:
Stream1: 1 2 3 ... 100000000000
Stream2: a
Обычный оператор zip
в этом случае взорвет память.
Я этого не хочу.
Резюме:
По сути, я ожидаю, что оператор zip
с потерями запомнит только первое значение, выданное stream 1
после предыдущего взаимного выброса, и выдаст, когда stream 2
догонит stream 1
. И повторить.
combineLatest
- person martin   schedule 06.10.2017combineLatest
не делает то, что я хочу. Он излучает каждый раз, когда излучает один поток. Мне нужно, чтобы он излучал каждый раз оба потока. В основном мне нужен операторzip
с размером буфера 1. - person JeB   schedule 06.10.2017