RxJS: пакетные запросы и общий ответ

Представим, что у меня есть функция fetchUser, которая принимает параметр userId и возвращает наблюдаемое пользователя.

Поскольку я часто вызываю этот метод, я хочу пакетировать идентификаторы, чтобы вместо этого выполнить один запрос с несколькими идентификаторами!

Вот и начались мои неприятности ...

Я не могу найти решение, чтобы сделать это, не разделяя наблюдаемое между различными вызовами fetchUser.

import { Subject, from } from "rxjs"
import { bufferTime, mergeMap, map, toArray, filter, take, share } from "rxjs/operators"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userToFetch$ = new Subject<string>()

const fetchedUser$ = userToFetch$.pipe(
    bufferTime(1000),
    mergeMap((userIds) => functionThatSimulateAFetch(userIds)),
    share(),
)

const fetchUser = (userId: string) => {
    const observable = fetchedUser$.pipe(
        map((users) => users.find((user) => user.id === userId)),
        filter((user) => !!user),
        take(1),
    )
    userToFetch$.next(userId)
    return observable
}

Но это некрасиво и имеет несколько проблем:

  • Если я откажусь от подписки на наблюдаемое, возвращаемое fetchUser, до того, как таймер bufferTime завершится, это не предотвратит выборку пользователя.
  • Если я отписываюсь от всех наблюдаемых, возвращаемых fetchUser, до завершения выборки пакета, он не отменяет запрос.
  • Обработка ошибок сложнее
  • так далее

В общем: я не знаю, как решить проблемы, требующие совместного использования ресурсов с помощью RxJS. Продвинутый пример RxJS найти сложно.


person antoinestv    schedule 11.09.2018    source источник
comment
Вы говорите, что вам нужно получать пользователей партиями, и это то, что functionThatSimulateAFetch, но тогда у вас есть fetchUser = (userId: string) => {...}, то есть функция для выборки одного пользователя. Чего вы хотите достичь?   -  person Picci    schedule 12.09.2018
comment
Проблема заключается в следующем: я хочу получать по одному пользователю за раз, но, чтобы не выполнять слишком много вызовов API, я хочу пакетировать запросы (поэтому я использую одну конечную точку API, которая возвращает список пользователей для данного списка пользователей идентификаторы). Но это просто пример, я часто сталкиваюсь с подобными проблемами. Как только я делюсь наблюдаемым с помощью _1 _ / _ 2_, я больше не могу знать источник значений моего потока, и мне нужно передать контекст и т. Д. (Как в примере выше). Я знаю, что не правильно отношусь к проблеме, поэтому я задаю свой вопрос!   -  person antoinestv    schedule 12.09.2018


Ответы (4)


Я думаю, что @Biggy прав.

Так я понимаю проблему и то, чего вы хотите достичь

  1. В вашем приложении есть разные места, куда вы хотите привлекать пользователей.
  2. Вы не хотите запускать запрос на выборку все время, скорее вы хотите буферизовать их и отправлять их через определенный интервал времени, скажем, 1 секунду
  3. Вы хотите отменить определенный буфер и избежать того, что в течение этого 1-секундного интервала запускается запрос на выборку группы пользователей.
  4. В то же время, если кто-то, назовем это Код в позиции X, запросил пользователя, а всего через несколько миллисекунд кто-то еще, то есть Код в позиции Y отменяет весь пакет запросов, затем код в позиции X должен получить какой-то ответ, скажем, null
  5. Более того, вы можете попросить получить пользователя, а затем передумать, если в течение интервала времени буфера, и избежать выборки этого пользователя (я далеко не уверен, что это действительно то, что вы хотите, но это как-то вытекает из вашего вопроса

Если все это правда, то вам, вероятно, нужен какой-то механизм очередей, как предлагал Багги.

Тогда может быть множество реализаций такого механизма.

person Picci    schedule 12.09.2018

То, что у вас есть, хорошо, но как и все RxJS, но дьявол кроется в деталях.

Проблемы

  1. switchMaping
        mergeMap((userIds) => functionThatSimulateAFetch(userIds)),

Здесь вы сначала ошибаетесь. Используя здесь карту слияния, вы лишаете возможности отличить «поток запросов» от «потока, возвращенного одним запросом»:

  • Вы делаете практически невозможным отказаться от подписки на индивидуальный запрос (отменить его)
  • Вы делаете невозможным обработку ошибок
  • Он упадет, если ваше внутреннее наблюдаемое испускает более одного раза.

Скорее, вы хотите испускать отдельные BatchEvent через обычный map (создавая наблюдаемое или наблюдаемое) и _5 _ / _ 6_ те, которые после фильтрации.

  1. Побочные эффекты при создании наблюдаемого и испускаемого перед подпиской
    userToFetch$.next(userId)
    return observable

Не делай этого. Наблюдаемое само по себе на самом деле ничего не делает. Это «план» последовательности действий, которые должны произойти , когда вы подпишетесь на нее. Поступая так, вы создадите только пакетное действие для наблюдаемого создания, но вы облажаетесь, если получите несколько или отложенных подписок.

Скорее, вы хотите создать наблюдаемое из defer, которое будет излучать в userToFetch$ при каждой подписке.

Даже в этом случае вы захотите подписаться на свой наблюдаемый перед отправкой на userToFetch: если вы не подписаны, ваш наблюдаемый объект не слушает тему, и событие будет потеряно. Вы можете сделать это в наблюдаемом, подобном отложенному.

Решение

Коротко и не сильно отличается от вашего кода, но структурируйте его так.

const BUFFER_TIME = 1000;

type BatchEvent = { keys: Set<string>, values: Observable<Users> };

/** The incoming keys */
const keySubject = new Subject<string>();

const requests: Observable<{ keys: Set<string>, values: Observable<Users> }> =
  this.keySubject.asObservable().pipe(
    bufferTime(BUFFER_TIME),
    map(keys => this.fetchBatch(keys)),
    share(),
  );

/** Returns a single User from an ID. Batches the request */
function get(userId: string): Observable<User> {
  console.log("Creating observable for:", userId);
  // The money observable. See "defer":
  // triggers a new subject event on subscription
  const observable = new Observable<BatchEvent>(observer => {
    this.requests.subscribe(observer);
    // Emit *after* the subscription
    this.keySubject.next(userId);
  });
  return observable.pipe(
    first(v => v.keys.has(userId)),
    // There is only 1 item, so any *Map will do here
    switchMap(v => v.values),
    map(v => v[userId]),
  );
}

function fetchBatch(args: string[]): BatchEvent {
  const keys = new Set(args); // Do not batch duplicates
  const values = this.userService.get(Array.from(keys)).pipe(
    share(),
  );
  return { keys, values };
}

Это именно то, о чем вы просили, в том числе:

  • Ошибки передаются получателям пакетного вызова, но никому другому
  • Если все отписываются от пакета, наблюдаемое отменяется.
  • Если все отписываются от пакета до того, как запрос запущен, он никогда не срабатывает.
  • Наблюдаемый объект ведет себя как HttpClient: подписка на наблюдаемый объект запускает новый (пакетный) запрос данных. Абоненты могут использовать трубку shareReplay или что-то еще. Так что никаких сюрпризов.

Вот рабочая демонстрация stackblitz Angular: https://stackblitz.com/edit/angular-rxjs-batch-request

В частности, обратите внимание на поведение, когда вы «переключаете» отображение: вы заметите, что повторная подписка на существующие наблюдаемые вызовет новые пакетные запросы, и что эти запросы будут отменены (или полностью не сработают), если вы повторно переключитесь достаточно быстро. .

Пример использования

В нашем проекте мы используем это для таблиц Angular, где каждая строка должна отдельно получать дополнительные данные для рендеринга. Это позволяет нам:

  • разбивать все запросы на «одну страницу» без каких-либо специальных знаний о разбиении на страницы
  • Потенциально получить сразу несколько страниц, если пользователь быстро перемещается по страницам.
  • повторно использовать существующие результаты, даже если размер страницы изменился

Ограничения

Я бы не стал добавлять к этому фрагменты или ограничение скорости. Поскольку наблюдаемый источник - тупой bufferTime, вы сталкиваетесь с проблемами:

  • «Разделение на части» произойдет до дедупликации. Таким образом, если у вас есть 100 запросов для одного userId, вы получите несколько запросов с одним элементом.
  • Если вы установите ограничение по скорости, вы не сможете проверить свою очередь. Таким образом, у вас может получиться очень длинная очередь, содержащая несколько одинаковых запросов.

Однако это пессимистическая точка зрения. Исправить это означало бы полностью завершить работу с помощью механизма очереди / пакетной обработки с отслеживанием состояния, который на порядок сложнее.

person Paul Féraud    schedule 07.08.2019

Не уверен, что это лучший способ решить эту проблему (по крайней мере, нужны тесты), но я постараюсь объяснить свою точку зрения.

У нас есть 2 queue: для ожидающих и для запросов функций.
result, чтобы помочь подписчикам доставить ответ / ошибку.
Какой-то рабочий, основанный на каком-то расписании, берет задачу из очереди, чтобы выполнить запрос.

Если я откажусь от подписки на наблюдаемый объект, возвращаемый fetchUser, до завершения таймера bufferTime, это не предотвратит выборку пользователя.

Отказ от подписки на fetchUser очистит request queue, а worker ничего не сделает.

Если я отписываюсь от всех наблюдаемых, возвращаемых fetchUser, до завершения выборки пакета, запрос не отменяется.

Подписка на работника until isNothingRemain$

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(

  map((userId) => ({ id: userId, name: "George" })),
  toArray(),
  tap(() => console.log('API_CALL', userIds)),
  delay(200),
)

class Queue {
  queue$ = new BehaviorSubject(new Map());

  private get currentQueue() {
    return new Map(this.queue$.getValue());
  }

  add(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.set(id, (acc.get(id) || 0) + 1);
      return acc;
    }, this.currentQueue);
    this.queue$.next(newMap);
  };

  addMap(idmap: Map<any, any>) {

    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.set(id, (acc.get(id) || 0) + idmap.get(id));
        return acc;
      }, this.currentQueue);
    this.queue$.next(newMap);
  }

  remove(...ids) {
    const newMap = ids.reduce((acc, id) => {
      acc.get(id) > 1 ? acc.set(id, acc.get(id) - 1) : acc.delete(id);
      return acc;
    }, this.currentQueue)
    this.queue$.next(newMap);
  };

  removeMap(idmap: Map<any, any>) {
    const newMap = (Array.from(idmap.keys()))
      .reduce((acc, id) => {
        acc.get(id) > idmap.get(id) ? acc.set(id, acc.get(id) - idmap.get(id)) : acc.delete(id);
        return acc;
      }, this.currentQueue)
    this.queue$.next(newMap);
  };

  has(id) {
    return this.queue$.getValue().has(id);
  }

  asObservable() {
    return this.queue$.asObservable();
  }
}

class Result {
  result$ = new BehaviorSubject({ ids: new Map(), isError: null, value: null });
  select(id) {
    return this.result$.pipe(
      filter(({ ids }) => ids.has(id)),
      switchMap(({ isError, value }) => isError ? throwError(value) : of(value.find(x => x.id === id)))
    )
  }
  add({ isError, value, ids }) {
    this.result$.next({ ids, isError, value });
  }

  clear(){
    this.result$.next({ ids: new Map(), isError: null, value: null });
  }
}

const result = new Result();
const queueToSend = new Queue();
const queuePending = new Queue();
const doRequest = new Subject();

const fetchUser = (id: string) => {
  return Observable.create(observer => {
    queueToSend.add(id);
    doRequest.next();

    const subscription = result
      .select(id)
      .pipe(take(1))
      .subscribe(observer);

    // cleanup queue after got response or unsubscribe
    return () => {
      (queueToSend.has(id) ? queueToSend : queuePending).remove(id);
      subscription.unsubscribe();
    }
  })
}


// some kind of worker that take task from queue and send requests
doRequest.asObservable().pipe(
  auditTime(1000),
  // clear outdated results
  tap(()=>result.clear()),
  withLatestFrom(queueToSend.asObservable()),
  map(([_, queue]) => queue),
  filter(ids => !!ids.size),
  mergeMap(ids => {
    // abort the request if it have no subscribers
    const isNothingRemain$ = combineLatest(queueToSend.asObservable(), queuePending.asObservable()).pipe(
      map(([queueToSendIds, queuePendingIds]) => Array.from(ids.keys()).some(k => queueToSendIds.has(k) || queuePendingIds.has(k))),
      filter(hasSameKey => !hasSameKey)
    )

    // prevent to request the same ids if previous requst is not complete
    queueToSend.removeMap(ids);
    queuePending.addMap(ids);
    return functionThatSimulateAFetch(Array.from(ids.keys())).pipe(
      map(res => ({ isErorr: false, value: res, ids })),
      takeUntil(isNothingRemain$),
      catchError(error => of({ isError: true, value: error, ids }))
    )
  }),
).subscribe(res => result.add(res))




fetchUser('1').subscribe(console.log);

const subs = fetchUser('2').subscribe(console.log);
subs.unsubscribe();

fetchUser('3').subscribe(console.log);



setTimeout(() => {
  const subs1 = fetchUser('10').subscribe(console.log);
  subs1.unsubscribe();

  const subs2 = fetchUser('11').subscribe(console.log);
  subs2.unsubscribe();
}, 2000)


setTimeout(() => {
  const subs1 = fetchUser('20').subscribe(console.log);
  subs1.unsubscribe();

  const subs21 = fetchUser('20').subscribe(console.log);
  const subs22 = fetchUser('20').subscribe(console.log);
}, 4000)


// API_CALL
// ["1", "3"]
// {id: "1", name: "George"}
// {id: "3", name: "George"}
// API_CALL
// ["20"]
// {id: "20", name: "George"}
// {id: "20", name: "George"}

пример stackblitz

person Buggy    schedule 12.09.2018
comment
Спасибо за ответ, может сработает! К сожалению, это кажется слишком сложным и больше похоже на взлом, чем на общий способ решения такого рода проблем. - person antoinestv; 12.09.2018
comment
Да, давайте вместе подождем лучшего решения! ) - person Buggy; 12.09.2018

К вашему сведению, я попытался создать общую пакетную очередь задач, используя ответы @buggy и @picci:

import { Observable, Subject, BehaviorSubject, from, timer } from "rxjs"
import { catchError, share, mergeMap, map, filter, takeUntil, take, bufferTime, timeout, concatMap } from "rxjs/operators"

export interface Task<TInput> {
    uid: number
    input: TInput
}

interface ErroredTask<TInput> extends Task<TInput> {
    error: any
}

interface SucceededTask<TInput, TOutput> extends Task<TInput> {
    output: TOutput
}

export type FinishedTask<TInput, TOutput> = ErroredTask<TInput> | SucceededTask<TInput, TOutput>

const taskErrored = <TInput, TOutput>(
    taskFinished: FinishedTask<TInput, TOutput>,
): taskFinished is ErroredTask<TInput> => !!(taskFinished as ErroredTask<TInput>).error

type BatchedWorker<TInput, TOutput> = (tasks: Array<Task<TInput>>) => Observable<FinishedTask<TInput, TOutput>>

export const createSimpleBatchedWorker = <TInput, TOutput>(
    work: (inputs: TInput[]) => Observable<TOutput[]>,
    workTimeout: number,
): BatchedWorker<TInput, TOutput> => (
    tasks: Array<Task<TInput>>,
) => work(
    tasks.map((task) => task.input),
).pipe(
    mergeMap((outputs) => from(tasks.map((task, index) => ({
        ...task,
        output: outputs[index],
    })))),
    timeout(workTimeout),
    catchError((error) => from(tasks.map((task) => ({
        ...task,
        error,
    })))),
)

export const createBatchedTaskQueue = <TInput, TOutput>(
    worker: BatchedWorker<TInput, TOutput>,
    concurrencyLimit: number = 1,
    batchTimeout: number = 0,
    maxBatchSize: number = Number.POSITIVE_INFINITY,
) => {
    const taskSubject = new Subject<Task<TInput>>()
    const cancelTaskSubject = new BehaviorSubject<Set<number>>(new Set())
    const cancelTask = (task: Task<TInput>) => {
        const cancelledUids = cancelTaskSubject.getValue()
        const newCancelledUids = new Set(cancelledUids)
        newCancelledUids.add(task.uid)
        cancelTaskSubject.next(newCancelledUids)
    }
    const output$: Observable<FinishedTask<TInput, TOutput>> = taskSubject.pipe(
        bufferTime(batchTimeout, undefined, maxBatchSize),
        map((tasks) => {
          const cancelledUids = cancelTaskSubject.getValue()
          return tasks.filter((task) => !cancelledUids.has(task.uid))
        }),
        filter((tasks) => tasks.length > 0),
        mergeMap(
            (tasks) => worker(tasks).pipe(
                takeUntil(cancelTaskSubject.pipe(
                    filter((uids) => {
                        for (const task of tasks) {
                            if (!uids.has(task.uid)) {
                                return false
                            }
                        }
                        return true
                    }),
                )),
            ),
            undefined,
            concurrencyLimit,
        ),
        share(),
    )
    let nextUid = 0
    return (input$: Observable<TInput>): Observable<TOutput> => input$.pipe(
        concatMap((input) => new Observable<TOutput>((observer) => {
            const task = {
                uid: nextUid++,
                input,
            }
            const subscription = output$.pipe(
                filter((taskFinished) => taskFinished.uid === task.uid),
                take(1),
                map((taskFinished) => {
                    if (taskErrored(taskFinished)) {
                        throw taskFinished.error
                    }
                    return taskFinished.output
                }),
            ).subscribe(observer)
            subscription.add(
                timer(0).subscribe(() => taskSubject.next(task)),
            )
            return () => {
                subscription.unsubscribe()
                cancelTask(task)
            }
        })),
    )
}

В нашем примере:

import { from } from "rxjs"
import { map, toArray } from "rxjs/operators"
import { createBatchedTaskQueue, createSimpleBatchedWorker } from "mmr/components/rxjs/batched-task-queue"

const functionThatSimulateAFetch = (userIds: string[]) => from(userIds).pipe(
    map((userId) => ({ id: userId, name: "George" })),
    toArray(),
)

const userFetchQueue = createBatchedTaskQueue(
    createSimpleBatchedWorker(
        functionThatSimulateAFetch,
        10000,
    ),
)

const fetchUser = (userId: string) => {
    return from(userId).pipe(
        userFetchQueue,
    )
}

Я открыт для любых предложений по улучшению

person antoinestv    schedule 28.09.2018