RxJs Observable завершается несколько раз

Ниже приведен короткий фрагмент реактивного кода (RxJs).

let subj = new Rx.Subject();
let chain = subj
    .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
    .share()
    .take(1);


function subscribe(){
  chain.subscribe(v => console.log("Next", v),
                  err => console.log("Error",err),
                  () => console.log("Completed"));
  chain.subscribe(v => console.log("Next2", v),
                  err => console.log("Error2",err),
                  () => console.log("Completed2"));
  subj.next(Math.random());
}

subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Согласно документации chain это Observable, который должен печатать испускаемое значение * 10 (switchMap ), при печати только один раз, независимо от количества имеющихся у него подписок (share), сделайте это только для первого выданного значения, а затем завершите.

Первые две пули работают нормально, а последняя - нет. Вот результат, который я получаю:

Switch map 9.022491050934722
Next 9.022491050934722
Completed
Next2 9.022491050934722
Completed2
Switch map 9.172999425126836
Next 9.172999425126836
Completed
Next2 9.172999425126836
Completed2
Switch map 6.168790337405257
Next 6.168790337405257
Completed
Next2 6.168790337405257
Completed2

Как видите, chain выполняется несколько раз.
Что позволяет выполнить одно и то же Observable несколько раз?


person JeB    schedule 29.01.2017    source источник


Ответы (1)


share - это ярлык для комбинации publish и refCount, это означает, что поток является «горячим» только до тех пор, пока есть хотя бы 1 подписчик, поэтому после завершения потока все активные подписчики автоматически отписываются, что, в свою очередь, сбрасывает stream, так как тогда подписчиков 0. Также: вы должны поставить take(1) перед share, поскольку любая следующая операция влияет на горячее состояние.

Как сделать поток "истинно" общедоступным / горячим, независимым от каких-либо подписчиков: Используйте publish и connect поток:

let subj = new Rx.Subject();
let chain = subj
    .switchMap(v => Rx.Observable.of(10*v).do(vv => console.log("Switch map", vv)))
    .take(1)
    .publish();
chain.connect();

function subscribe(){
  chain.subscribe(v => console.log("Next", v),
                  err => console.log("Error",err),
                  () => console.log("Completed"));
  chain.subscribe(v => console.log("Next2", v),
                  err => console.log("Error2",err),
                  () => console.log("Completed2"));
  subj.next(Math.random());
}

subscribe();
subscribe();
subscribe();
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

person olsn    schedule 29.01.2017
comment
Итак, вы говорите, что каждый Observable, который возвращается из share, сбрасывается и переоценивается после его завершения? - person JeB; 29.01.2017
comment
Не автоматически, а как только появится новый подписчик. - person olsn; 29.01.2017
comment
Это все объясняет. Хотелось бы, чтобы это было в документации. Я не получил take после share части. Не могли бы вы уточнить, пожалуйста? - person JeB; 29.01.2017
comment
Это в документации: пока есть хотя бы один подписчик, этот Observable будет подписан и будет передавать данные. Когда все подписчики отписались, он откажется от подписки на источник Observable - и, конечно же, один из основных принципов Rx: когда вы отказываетесь от подписки, поток останавливается, когда вы подписываетесь, поток начинается с 0. - Имейте в виду, что в вашем примере вы не завершаете тему, а просто выполняете подпоток - если бы вы выполнили Subject, то он вел бы себя больше, чем вы ожидали (хотя по совершенно другим причинам и не из-за доли) - person olsn; 29.01.2017
comment
В нем говорится, что когда все подписчики отписались, он откажется от подписки на источник Observable, но не указывает, что все подписчики отписываются автоматически по завершении потока. Кроме того, я не знал, что вы можете повторно использовать поток, который уже был завершен, подписавшись на него снова. Где я могу прочитать об этом? Кроме того, не могли бы вы подробнее рассказать о Вы должны поставить take(1) перед share, поскольку любая последующая операция влияет на горячее состояние. Спасибо! - person JeB; 29.01.2017
comment
Это один из основных принципов, вы, вероятно, могли бы найти в Google вступление RxJS - всякий раз, когда поток (ЛЮБОЙ поток) завершается или выдает ошибку, любой подписчик автоматически отписывается - что касается порядка: share возвращает общий поток, но если вы добавите оператор, вы получите новый поток, который используется только до точки share-operator - с take(n) это не слишком большая разница, хотя, если бы был switchMap, например после совместного использования вы увидите много нежелательных вещей, поэтому практическое правило должно заключаться в том, чтобы помещать любого разделяющего оператора последним. - person olsn; 29.01.2017