Что за неожиданное поведение Observable RxJS с асинхронными функциями и toPromise?

Когда я использую только метод подписки, он работает правдоподобно, но с этим кодом - я не понимаю результат.

const Observable = require("rxjs").Observable;
let i = 0;
const x = new Observable((o) => {
    setInterval(() => o.next(++i), 1000);
});

(async () => {
    while (true) {
        try {
            console.log("loop");
            console.log("value", await x.toPromise());
        } catch (e) {
            console.log(e);
        }
    }
})();
x.subscribe((value) => {
    console.log("subscribe", value);
});

Результат этого кода:

loop
subscribe 2
subscribe 4
subscribe 6
subscribe 8
subscribe 10
subscribe 12
subscribe 14

Что случилось?

То же самое работает и с этим вариантом использования toPromise

function a() {
    x.toPromise().then((value) => {
        console.log("promise", value);
        a();
    }).catch((e) => {
        console.log("error", value);
    });
}
a();

person arvitaly    schedule 12.05.2018    source источник


Ответы (1)


toPromise() выполняется для Observable после его завершения. Поскольку ваш наблюдаемый никогда не завершается, он не выполняется. Используйте take(1), чтобы заставить его выдавать значение до завершения наблюдаемого.

const Observable = require("rxjs").Observable;
let i = 0;
const x = new Observable((o) => {
    setInterval(() => o.next(++i), 1000);
});

(async () => {
    while (true) {
        try {
            console.log("loop");
            console.log("value", await x.take(1).toPromise());//here
        } catch (e) {
            console.log(e);
        }
    }
})();
x.subscribe((value) => {
    console.log("subscribe", value);
});

Выход:

loop
subscribe 2
value 1
loop
subscribe 4
value 5
loop
subscribe 7
value 9
loop
subscribe 11
value 14

Что касается значений:

take() завершится, как только будет выдано хотя бы одно значение, независимо от того, завершится ли исходное наблюдаемое. Так что это действительно зависит от того, какое значение выдает наблюдаемый объект при следующем вызове toPromise().

person Suraj Rao    schedule 12.05.2018
comment
take() завершится независимо от того, завершится ли исходное наблюдаемое. Так что это действительно зависит от того, какое значение выдает наблюдаемый объект при следующем вызове toPromise. - person Suraj Rao; 12.05.2018
comment
@arvitaly «Не очевидно» — это то, чему вы можете противостоять, прочитав документацию. Он четко описывает, как он себя ведет. - person Ingo Bürk; 12.05.2018
comment
@SurajRao Нет, source.take(1) не гарантируется. Он завершается только в том случае, если источник излучает хотя бы один раз. Кроме того, он не завершается, если источник ошибается перед выбросом. - person Ingo Bürk; 12.05.2018