RxJs: форма zip-оператора с потерями

Рассмотрите возможность использования оператора zip для объединения двух бесконечных Observable, один из которых испускает элементы в два раза чаще, чем другие.
Текущая реализация без потерь, то есть, если я продолжу излучать эти Observable в течение часа, а затем переключаюсь между их скоростью излучения, первый Observable в конечном итоге догонит другого.
Это вызовет взрыв памяти в какой-то момент по мере того, как буфер становится все больше и больше.
То же самое произойдет, если первый наблюдаемый будет генерировать элементы в течение нескольких часов, а второй - один элемент в конце.

Как добиться для этого оператора поведения с потерями? Я просто хочу излучать в любое время, когда получаю выбросы из обоих потоков, и меня не волнует, сколько выбросов из более быстрого потока я пропускаю.

Уточнения:

  • Основная проблема, которую я пытаюсь решить здесь, - это взрыв памяти из-за без потерь работы оператора zip.
  • Я хочу излучать в любое время, когда получаю выбросы из обоих потоков, даже если оба потока каждый раз генерируют одно и то же значение

Пример:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

Обычный zip даст следующий результат:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

Результат, который я хотел бы получить:

[1, 10]
[3, 20]
[5, 30]

Объяснение:
Оператор zip с потерями - это zip с размером буфера 1. Это означает, что он сохранит только первый элемент из потока, который был сгенерирован первым, и потеряет все остальные (элементы, которые прибывают между первым элементом и первым выпуском из второго потока). В этом примере происходит следующее: stream1 излучает 1, zip с потерями "запоминает" его и игнорирует все элементы на stream1 до тех пор, пока stream2 не излучает. Первая эмиссия stream2 равна 10, поэтому stream1 проигрывает 2. После взаимной эмиссии (первая эмиссия с потерями zip) она начинается заново: «запомнить» 3, «потерять» 4, испустить [3,20]. Затем начните сначала: «запомнить» 5, «потерять» 6 и 7, испустить [5,30]. Затем начните сначала: «запомните» 40, «потеряйте» _28 _, _ 29 _, _ 30_ и ждите следующего пункта stream1.

Пример 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

Обычный оператор zip в этом случае взорвет память.
Я этого не хочу.

Резюме:
По сути, я ожидаю, что оператор zip с потерями запомнит только первое значение, выданное stream 1 после предыдущего взаимного выброса, и выдаст, когда stream 2 догонит stream 1. И повторить.


person JeB    schedule 06.10.2017    source источник
comment
Взгляните на combineLatest   -  person martin    schedule 06.10.2017
comment
combineLatest не делает то, что я хочу. Он излучает каждый раз, когда излучает один поток. Мне нужно, чтобы он излучал каждый раз оба потока. В основном мне нужен оператор zip с размером буфера 1.   -  person JeB    schedule 06.10.2017


Ответы (5)


Следующее даст вам желаемое поведение:

Observable.zip(s1.take(1), s2.take(1)).repeat()

В синтаксисе RxJs 5.5+ pipe:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

Объяснение:

  • Оператор repeat (в его текущей реализации) повторно подписывается на наблюдаемый источник по завершении последнего, т.е. в данном конкретном случае он повторно подписывается на zip при каждой взаимной эмиссии.
  • zip объединяет два наблюдаемых объекта и ждет, пока они оба излучают. combineLatest тоже подойдет, это не имеет значения из-за take(1)
  • take(1) на самом деле заботится о взрыве памяти и определяет поведение с потерями

Если вы хотите брать последнее значение вместо первого из каждого потока при взаимной эмиссии, используйте это:

Observable.combineLatest(s1, s2).take(1).repeat()

В синтаксисе RxJs 5.5+ pipe:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

person JeB    schedule 08.10.2017
comment
Обратите внимание, что ваш ответ не испускает пару [7, 40], как указано в вашем вопросе. Почему это? - person Richard Matsen; 08.10.2017
comment
Правильно, вы правы, он не должен выдавать пару [7, 40]. Если оператор передает первые элементы во взаимной передаче, пара для 40 еще не в потоке в этот момент. Исправлю на примере. - person JeB; 08.10.2017
comment
Чем [7,40] отличается от других пар? - person Richard Matsen; 08.10.2017
comment
Оператор zip с потерями - это zip с размером буфера 1. Это означает, что он сохранит только первый элемент из потока, который был отправлен первым, и потеряет все остальные (элементы, которые прибывают между первым элементом и первым выпуском из второго потока). В этом примере происходит следующее: stream1 излучает 1, zip с потерями запоминает и игнорирует все элементы на stream1 до тех пор, пока stream2 не излучает. Первая эмиссия stream2 - 10, поэтому stream1 проигрывает 2. После взаимной эмиссии (первая эмиссия с потерями zip) она начинается заново ... - person JeB; 08.10.2017
comment
... он запомнит 3, потеряет 4, испустит [3,20]. Затем начните сначала: запомните 5, потеряйте 6 и 7, испустите [5,30]. Затем начните сначала: запомните 40, потеряйте _9 _, _ 10 _, _ 11_ и ждите следующего пункта на stream1. Я что-то упустил? - person JeB; 08.10.2017
comment
Спасибо, это все объясняет. Было бы полезно добавить это к вопросу. - person Richard Matsen; 08.10.2017
comment
Разве синтаксис RxJS 5.5+ для взятия последнего не должен быть CombineLatest (s1, s2) .pipe (take (1), repeat ())? Взять по одному из каждого внешнего наблюдаемого будет иметь тот же эффект, что и один с застежкой-молнией. Поэтому я думаю, что в обоих случаях можно использовать combLatest. См. Следующее. stackblitz.com/edit/rxjs-4jae8r - person Trevor Karjanis; 07.07.2020

Это дает последовательность [0, 2] [1, 5] [2, 8] [3, 12] ...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

с возможно меньшим количеством операторов. Не уверен, насколько это эффективно.
CodePen

Для обновления №3

Тогда вам понадобится ключ для каждого исходного элемента.

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

Это производит

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen (обновите страницу CodePen после открытия консоли для лучшего отображения)

person Richard Matsen    schedule 06.10.2017
comment
Думаю, это решение лучше моего. - person martin; 06.10.2017
comment
но вдохновленный вашим skipUntil () - person Richard Matsen; 06.10.2017
comment
Это решает совсем другую проблему. Ваше решение не будет генерировать каждый раз, когда генерируются оба потока. Он будет генерировать каждый раз, когда оба потока испустили значения, отличные от предыдущих. В моем случае я не хочу distinctUntilChanged. Я хочу выдавать значения, даже если они одинаковы. Давайте просто предположим, что оба Observable каждый раз испускают 1. Первый испустил 10 раз, а второй 5 раз. Я хотел бы получить 5 выбросов, не запоминая 5 избыточных выбросов. Надеюсь, теперь все ясно. - person JeB; 06.10.2017
comment
Добавил пояснение по этому поводу в вопрос. - person JeB; 06.10.2017
comment
Да, теперь (после того, как вы добавили ключ) он заработает. Единственная проблема в том, что он решает проблему для оператора forkJoin (взять последний), а не для zip (взять первый). Он также становится довольно перегруженным по сравнению с ответом @martin. Я обновил вопрос конкретными примерами. - person JeB; 08.10.2017

Я думаю, что следующее должно всегда принимать последнее значение из каждого источника Observable.

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

Живая демонстрация: http://jsbin.com/vawewew/11/edit?js,console

Это печатает:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

Возможно, вам придется превратить source1 и source2 в горячие наблюдаемые объекты, если они еще не были.

Редактировать:

Основная часть - source1.takeUntil(source2.skipUntil(source1)). Принимает значения от source1 до source2. Но в то же время он будет игнорировать source1, пока source2 не выдаст хотя бы одно значение :).

forkJoin() Observable ожидает завершения работы обоих источников, запоминая последнее излучение от каждого из них.

Затем мы хотим повторить процесс и поэтому используем take(1) для завершения цепочки и .repeat() для немедленной повторной подписки.

person martin    schedule 06.10.2017
comment
Это непросто! Спасибо! Но не всегда ли он будет излучать первое комбинированное излучение? Я имею в виду, что takeUntil будет принимать только первое значение каждого из потоков, а defer будет сбрасываться при каждом новом подписчике. Но как насчет одного абонента, который хочет получить все комбинированные выбросы? И зачем нам здесь take(1) и repeat? Не создаст ли это бесконечный поток первого комбинированного излучения? - person JeB; 06.10.2017
comment
Смотрите редактирование. Если вы не хотите начинать излучение с самого начала, вам нужно превратить источники в горячие Observables. - person martin; 06.10.2017
comment
Теперь почти ясно. Затем мы хотим повторить процесс, поэтому мы используем take(1) для завершения цепочки и .repeat() для немедленной повторной подписки - это также объясняет оператор defer. Но почему repeat немедленно повторно подпишется? Согласно документации: Создает наблюдаемую последовательность, которая повторяет данный элемент указанное количество раз, используя указанный планировщик для отправки сообщений наблюдателя. Как происходит повторная подписка? - person JeB; 06.10.2017
comment
Признаюсь, это действительно сбивающее с толку описание. Кажется, что он повторяет предыдущие выбросы, но это не то, что он делает. Фактически, он просто повторно подписывается на свой источник Observable после получения complete уведомления. Вы можете увидеть это здесь github.com/ReactiveX/rxjs / blob / master / src / operator / - person martin; 06.10.2017
comment
Что ж, либо описание должно быть исправлено, либо оно зависит от реализации, и вы на самом деле не можете предполагать, что это работает таким образом. Спасибо за объяснение, я многому научился из ваших ответов. - person JeB; 06.10.2017
comment
как насчет defer(zip).take(1).repeat? Не будет ли то же самое с меньшим количеством операторов? - person JeB; 06.10.2017
comment
@meltedspark Это не так. Он всегда будет выдавать только первый элемент из каждого источника Observable. С forkJoin он излучает последний. - person martin; 06.10.2017
comment
Верно. Но если единственное, что меня волнует, это факт выбросов (я просто хочу знать, когда оба потока испускаются), zip выполнил бы свою работу, не так ли? - person JeB; 06.10.2017
comment
Я думаю, что будет, но я не тестировал - person martin; 06.10.2017
comment
Если придерживаться исходного вопроса, в котором говорится, что оператор zip с потерями, не будет ли defer(zip).take(1).repeat правильным решением? А ваше решение - это форма forkJoin с потерями? - person JeB; 06.10.2017
comment
Я думаю, это было бы - person martin; 06.10.2017
comment
Поскольку я не хочу брать на себя ответственность за ваш ответ, я предлагаю вам отредактировать его до версии zip, отметив при этом, что правильная версия для forkJoin - это та, которую вы предложили, и я приму ваш ответ. В противном случае мне придется отвечать zip версией, хотя решение нашло именно вы. - person JeB; 06.10.2017
comment
Я обновил вопрос конкретными примерами, пожалуйста, посмотрите, можете ли вы обновить свой ответ, чтобы я мог его принять. - person JeB; 08.10.2017
comment
Я немного изменил ваше решение, чтобы оно соответствовало примеру, описанному в вопросе: jsbin.com/zurezubejo / edit? js, консоль. Это работает, но похоже, что это одна эмиссия позади - если бы я ожидал получить последнее значение от каждого потока при взаимной эмиссии, я ожидал бы получить [2,10], [4,20], [7,30], но на самом деле я получаю просто [2,10], [4,20]. Я думаю, что я придумал более простое решение, которое работает так, как ожидалось (с использованием combineLatest и take(1) внутри defer). Ознакомьтесь с моим ответом и дайте мне знать, если вы считаете, что произошла ошибка. - person JeB; 08.10.2017
comment
Еще кое-что. Кажется, вам здесь не нужен defer. take(1) завершает наблюдаемое и repeat повторно подписывается. Проверил в своем решении, работает без defer, нет причин, по которым ваш не работает. - person JeB; 10.10.2017

Вы упоминаете размер буфера 1 и задаетесь вопросом, поможет ли это заархивировать два ReplaySubject с размером буфера 1?

person Richard Matsen    schedule 06.10.2017
comment
Я не могу делать предположений о том, что это за Subject производитель. Я могу, конечно, заключить исходный код Observables в ReplaySubject, но это совершенно не поможет. Цель ReplaySubject совершенно иная: по сути, он передает любому наблюдателю все элементы, которые были отправлены источником Observable, независимо от того, когда наблюдатель подписывается. - person JeB; 06.10.2017
comment
«по сути, он передает любому наблюдателю все элементы, которые были отправлены источником Observable» - до предела размера буфера, не так ли? Так что я думаю, что скорость архивирования будет такой же, как у самого медленного источника. Я бы проверил это для вас, но боюсь, что для меня уже поздний вечер. - person Richard Matsen; 06.10.2017

Я добавляю еще один ответ для ясности, поскольку он идет после принятого ответа (но основывается на моем предыдущем ответе).

Простите меня, если я неправильно понял, но я ожидал, что решение будет обрабатывать переключение уровней выбросов:

затем я переключаюсь между их скоростью излучения,

Поставляемый тест не переключает скорость излучения до тех пор, пока после не остановится первый поток,

Stream1: 1 2    3 4    5 6 7                 
Stream2:     10     20    30 40 50 60 70

поэтому я попробовал еще один тест

Stream1: 1 2      3 4     5 6
Stream2:    10 20    30 40   50 60

Тестовые данные для этого потока

s1.next(1); s1.next(2); s2.next(10); s2.next(20); s1.next(3); s1.next(4);
s2.next(30); s2.next(40); s1.next(5); s1.next(6);  s2.next(50); s2.next(60);

Насколько я понимаю, принятый ответ не проходит этот тест.
Он выводит

[1, 10]
[3, 20]
[4, 30]
[5, 40]
[6, 50]

тогда как я ожидал увидеть

[1, 10]
[3, 30]
[5, 50]

если оператор должен быть симметричным (коммутативным?)

Улучшение моего предыдущего ответа

Это решение построено на основе базовых операторов, поэтому его легче понять. Я не могу говорить об его эффективности, возможно, проверим это в другой итерации.

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

const tagged1 = s1.map(x=>[x,1])
const tagged2 = s2.map(x=>[x,2])
const merged = tagged1.merge(tagged2)
const fresh = merged.scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only
const dekeyed = fresh.map(keyed => keyed[0])
const paired = dekeyed.pairwise()
let index = 0
const sequenced = paired.map(x=>[x,index++])
const alternates = sequenced.filter(x => x[1] % 2 === 0)
const deindexed = alternates.map(x=>x[0])

или в более компактной форме, если желательно

let index = 0
const output = 
  s1.map(x=>[x,1]).merge(s2.map(x=>[x,2])) // key by stream id
  .scan((acc, x) => { 
    return x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged()       //fresh ones only
  .map(keyed => keyed[0])       // de-key
  .pairwise()                   // pair
  .map(x=>[x,index++])          // add a sequence no
  .filter(x => x[1] % 2 === 0)  // take even sequence
  .map(x=>x[0])                 // deindex

Для тестирования CodePen (обновите страницу CodePen после открытия консоли для лучшего отображения)

person Richard Matsen    schedule 11.10.2017
comment
Спасибо за потраченное время, но переключение между уровнями выбросов упоминается в вопросе только для того, чтобы подчеркнуть текущее поведение без потерь. Это упомянуто в разделе, где я описываю проблему, а не вопрос. Я думал, что сформулировал довольно четко, в чем именно заключается мой вопрос, не так ли? Примеры и пояснения недостаточно понятны? - person JeB; 12.10.2017
comment
Также было бы полезно, если бы вы объяснили, почему вы ожидаете увидеть этот результат (как и я в примере в вопросе). Потому что на данный момент первый вывод кажется гораздо более логичным, чем второй. - person JeB; 12.10.2017
comment
Дело в том, что с потерями zip должно генерироваться каждый раз, когда оба потока испускают, не запоминая элементы между ними. В вашем примере первый раз два потока испускают, когда первый испускает 1, теряет 2, а второй испускает 10. Это нормально. Но тогда в следующий раз оба излучают, когда второй излучает 20, а первый излучает 3. После каждого взаимного излучения он начинает ждать следующего взаимного излучения. Или я не понял твою точку зрения? - person JeB; 12.10.2017
comment
Что ж, это ваше определение проблемы, поэтому я не собираюсь спорить с ним слишком :). - person Richard Matsen; 12.10.2017
comment
Мое главное восприятие состоит в том, что (в случае моего нового теста), если stream1 испускает 1 и отбрасывает 2, то для симметрии, когда stream2 берет на себя инициативу, он будет делать то же самое (испускать его первым и отбрасывать все последующие, пока не испускает другой поток. - person Richard Matsen; 12.10.2017
comment
Я вижу, вы говорите, что 10 было «использовано», поэтому теперь 20 находится в очереди, ожидая следующего элемента stream1. Это также справедливо, но тогда не должна ли логика диктовать, что 4 должно быть в выбросе, поскольку 3 было «использовано», поэтому 4 теперь находится в очереди, ожидая следующего элемента stream2? - person Richard Matsen; 12.10.2017
comment
Исправление последнего - я вижу, вы говорите, что 10 было «использовано», поэтому теперь 20 находится в очереди, ожидая следующего элемента stream1. Это справедливо, но это означает, что stream2 не ведет себя точно так же, как stream1, с точки зрения элементов, которые он отбрасывает. - person Richard Matsen; 12.10.2017
comment
Я не уверен, что вы имеете в виду, когда говорите, что stream2 не ведет себя точно так же, как stream1. В чем разница? Как по мне, он ведет себя как обычный zip, но без функции памяти. Он срабатывает каждый раз, когда генерируются оба потока, а затем запускается заново. Вероятно, более простой способ понять, чего я хочу, - это сбросить память обоих потоков после взаимной эмиссии. - person JeB; 12.10.2017