То, что у вас есть, хорошо, но как и все RxJS, но дьявол кроется в деталях.
Проблемы
switchMap
ing
mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
Здесь вы сначала ошибаетесь. Используя здесь карту слияния, вы лишаете возможности отличить «поток запросов» от «потока, возвращенного одним запросом»:
- Вы делаете практически невозможным отказаться от подписки на индивидуальный запрос (отменить его)
- Вы делаете невозможным обработку ошибок
- Он упадет, если ваше внутреннее наблюдаемое испускает более одного раза.
Скорее, вы хотите испускать отдельные BatchEvent
через обычный map
(создавая наблюдаемое или наблюдаемое) и _5 _ / _ 6_ те, которые после фильтрации.
- Побочные эффекты при создании наблюдаемого и испускаемого перед подпиской
userToFetch$.next(userId)
return observable
Не делай этого. Наблюдаемое само по себе на самом деле ничего не делает. Это «план» последовательности действий, которые должны произойти , когда вы подпишетесь на нее. Поступая так, вы создадите только пакетное действие для наблюдаемого создания, но вы облажаетесь, если получите несколько или отложенных подписок.
Скорее, вы хотите создать наблюдаемое из defer
, которое будет излучать в userToFetch$
при каждой подписке.
Даже в этом случае вы захотите подписаться на свой наблюдаемый перед отправкой на userToFetch
: если вы не подписаны, ваш наблюдаемый объект не слушает тему, и событие будет потеряно. Вы можете сделать это в наблюдаемом, подобном отложенному.
Решение
Коротко и не сильно отличается от вашего кода, но структурируйте его так.
const BUFFER_TIME = 1000;
type BatchEvent = { keys: Set<string>, values: Observable<Users> };
/** The incoming keys */
const keySubject = new Subject<string>();
const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
this.keySubject.asObservable().pipe(
bufferTime(BUFFER_TIME),
map(keys => this.fetchBatch(keys)),
share(),
);
/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
console.log("Creating observable for:", userId);
// The money observable. See "defer":
// triggers a new subject event on subscription
const observable = new Observable<BatchEvent>(observer => {
this.requests.subscribe(observer);
// Emit *after* the subscription
this.keySubject.next(userId);
});
return observable.pipe(
first(v => v.keys.has(userId)),
// There is only 1 item, so any *Map will do here
switchMap(v => v.values),
map(v => v[userId]),
);
}
function fetchBatch(args: string[]): BatchEvent {
const keys = new Set(args); // Do not batch duplicates
const values = this.userService.get(Array.from(keys)).pipe(
share(),
);
return { keys, values };
}
Это именно то, о чем вы просили, в том числе:
- Ошибки передаются получателям пакетного вызова, но никому другому
- Если все отписываются от пакета, наблюдаемое отменяется.
- Если все отписываются от пакета до того, как запрос запущен, он никогда не срабатывает.
- Наблюдаемый объект ведет себя как HttpClient: подписка на наблюдаемый объект запускает новый (пакетный) запрос данных. Абоненты могут использовать трубку
shareReplay
или что-то еще. Так что никаких сюрпризов.
Вот рабочая демонстрация stackblitz Angular: https://stackblitz.com/edit/angular-rxjs-batch-request
В частности, обратите внимание на поведение, когда вы «переключаете» отображение: вы заметите, что повторная подписка на существующие наблюдаемые вызовет новые пакетные запросы, и что эти запросы будут отменены (или полностью не сработают), если вы повторно переключитесь достаточно быстро. .
Пример использования
В нашем проекте мы используем это для таблиц Angular, где каждая строка должна отдельно получать дополнительные данные для рендеринга. Это позволяет нам:
- разбивать все запросы на «одну страницу» без каких-либо специальных знаний о разбиении на страницы
- Потенциально получить сразу несколько страниц, если пользователь быстро перемещается по страницам.
- повторно использовать существующие результаты, даже если размер страницы изменился
Ограничения
Я бы не стал добавлять к этому фрагменты или ограничение скорости. Поскольку наблюдаемый источник - тупой bufferTime
, вы сталкиваетесь с проблемами:
- «Разделение на части» произойдет до дедупликации. Таким образом, если у вас есть 100 запросов для одного userId, вы получите несколько запросов с одним элементом.
- Если вы установите ограничение по скорости, вы не сможете проверить свою очередь. Таким образом, у вас может получиться очень длинная очередь, содержащая несколько одинаковых запросов.
Однако это пессимистическая точка зрения. Исправить это означало бы полностью завершить работу с помощью механизма очереди / пакетной обработки с отслеживанием состояния, который на порядок сложнее.
person
Paul Féraud
schedule
07.08.2019
functionThatSimulateAFetch
, но тогда у вас естьfetchUser = (userId: string) => {...}
, то есть функция для выборки одного пользователя. Чего вы хотите достичь? - person Picci   schedule 12.09.2018