RxJS массив Observables с дополнительными данными для сохранения

Краткое объяснение

Как обработать массив Observable (например, в forkJoin) с передачей некоторых данных для каждого Observable, которые мне нужно использовать в _2 _ & _ 3_?

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => [this.getDataFromApi(source[key]), key])

// resolve only observables[0][0], observables[0][1], observables[0][2] in magic way,
// but preserve  observables[1][0], observables[1][1], observables[1][2] for future use in pipe&map
const processedObservable = forkJoin(observables).pipe( // ???
  map(items => items.map(item => 'this is ' + item[0] + '(' + item[1] + ')')), // this doesn't work
  map(items => items.join(', '))
)

processedObservable.subscribe(text => console.log(text)) // subscribe only for test
// expected result: this is your cat (animal), this is your apple (fruit), this is your blue (color)

Длинное объяснение

У меня есть источник (массив или объект элементов). Мне нужно запросить каждый элемент в API, поэтому я получаю массив Observables. Затем я хочу обработать все полученные данные, поэтому я использую forkJoin и обрабатываю данные в pipe и нескольких map.

Я не могу обрабатывать данные в подписке напрямую.

Вот простой пример:

const source = ['cat', 'apple', 'blue']
const observables = source.map(item => this.getDataFromApi(item))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => 'this is ' + item)),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: this is your cat, this is your apple, this is your blue

Но помимо данных элементов для запросов API у меня есть метаданные элементов, которые я должен использовать во время обработки в pipe & map.

Вот пример с репрезентативным источником, но здесь я не использую метаданные элементов (результат такой же, как и выше). Я проигнорировал метаданные:

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => this.getDataFromApi(source[key]))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => 'this is ' + item)),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: this is your cat, this is your apple, this is your blue

Вот пример с репрезентативным источником, но здесь я проигнорировал ключи и вызовы API, но обработал метаданные элементов:

const source = {animal: 'cat', fruit: 'apple', color: 'blue'}
const observables = Object.keys(source).map(key => of(key))
const processedObservable = forkJoin(observables).pipe(
  map(items => items.map(item => '(' + item + ')')),
  map(items => items.join(', '))
)
processedObservable.subscribe(text => console.log(text)) // test
// result: (animal), (fruit), (color)

Я хочу получить такой результат:

// result: this is your cat (animal), this is your apple (fruit), this is your blue (color)

Примерно так в _14 _ & _ 15_:

  map(items => items.map(item => 'this is ' + item.apiValue + '(' + item.key + ')')),

or:

  map(items => items.map(item => 'this is ' + item[0] + '(' + item[1] + ')')),

Но я не знаю, как передать массив наблюдаемых и метаданных forkJoin, некоторые из этого массива наблюдаемых с метаданными:

const observables = Object.keys(source).map(key => [this.getDataFromApi(source[key]), key])

Может, стоит использовать другую функцию, например flatMap или switchMap?

Дополнительная информация

Метод getDataFromApi для моделирования вызовов API:

  getDataFromApi(item) {
    return of('your ' + item)
  }

person mkczyk    schedule 03.09.2020    source источник


Ответы (4)


Вы должны добавить свой первый map прямо там, где вы создаете отдельные наблюдаемые.

const source = { animal: "cat", fruit: "apple", color: "blue" };

const observables = Object.keys(source).map(key => getFromApi(source[key]).pipe(
  map(item => `this is ${item} (${key})`)
));

const processedObservable = forkJoin(observables).pipe(
  map(items => items.join(', '))
);

processedObservable.subscribe(console.log);

https://stackblitz.com/edit/rxjs-4x7hbv?file=index.ts

person frido    schedule 03.09.2020

Ниже приведен пример того, как это может быть реализовано:

from(Object.entries(source))
 .pipe(
   mergeMap(([key, value]) => getFromApi(value).pipe(
     map(item => `this is ${item} (${key})`), // <= key is being closured
   )),
   toArray(), // <= transform stream to array 
   map(item => item.join()), 

 )
 .subscribe(console.log);

Попробуйте запустить следующую демонстрацию:

const { from, of } = rxjs;
const { map, switchMap, toArray } = rxjs.operators;

function getFromApi(item) {
  return of('your ' + item)
}

const source = { animal: "cat", fruit: "apple", color: "blue" };


from(Object.entries(source))
  .pipe(
    mergeMap(([key, value]) => getFromApi(value).pipe(
      map(item => `this is ${item} (${key})`), // <= key is being closured
    )),
    toArray(), // <= transform stream to array 
    map(item => item.join()),
  )
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>

person Rafi Henig    schedule 03.09.2020
comment
Это не работает для асинхронных вызовов API. switchMap откажется от подписки на вызов API и подпишется на следующий вызов до завершения предыдущего. Вы увидите такое поведение, если добавите задержку к моделированному вызову API of('your ' + item).pipe(delay(0)). - person frido; 08.09.2020
comment
mergeMap не гарантирует, что объединенный наблюдаемый объект будет выдавать ответы api в том же порядке, в котором поступают элементы из источника. Если первый вызов API занимает больше времени, чем второй, объединенный наблюдаемый объект выдаст второй ответ до того, как будет отправлен первый ответ. Таким образом, порядок элементов в массиве, собранном toArray, может отличаться от порядка элементов в исходном массиве. - person frido; 08.09.2020

function getDataFromApi(item) {
  return of('your ' + item)
}

const source = { animal: 'cat', fruit: 'apple', color: 'blue' };

from(Object.keys(source)).pipe(
  switchMap(key => forkJoin([getDataFromApi(source[key]), of(key)])),
  map(([data, key]) => `this is ${data} (${key})`),
  toArray(),
  map(data => data.join(', ')),
).subscribe(console.log);

См .: https://stackblitz.com/edit/rxjs-1t4nlo?file=index.ts

person MoxxiManagarm    schedule 03.09.2020
comment
Это не работает для асинхронных вызовов API. switchMap откажется от подписки на вызов API и подпишется на следующий вызов до завершения предыдущего. Вы увидите такое поведение, если добавите задержку к моделированному вызову API of('your ' + item).pipe(delay(0)). - person frido; 08.09.2020

Попробуйте ниже, используя import { combineLatest } from 'rxjs'

 combineLatest(['cat', 'apple', 'blue'].map(item => this.getDataFromApi(item))).pipe(
    map(items => items.map(item => 'this is ' + item)),
    map(items => items.join(', '))
  )
  .subscribe({
    next:(res) => console.log(res)
  })

Пример в stackblitz

person Owen Kelvin    schedule 03.09.2020