Использование RxJS switchMap только для отказа от подписки на потоки с тем же URL-адресом запроса / полезной нагрузкой действия (эпики, наблюдаемые с помощью redux)

У меня есть интерфейс, в котором пользователь может инициировать вызовы одной и той же конечной точки, но с разными параметрами (в данном случае UUID). До сих пор я наслаждался поведением switchMap отмены моих HTTP-запросов в полете всякий раз, когда я отправляю новое действие redux с тем же типом, и в этом случае мне все еще нужно такое поведение, но только если запрошенный UUID нового действия (часть объекта действия) совпадает с уже выполняемым. Я не совсем уверен в правильном подходе к этому.

Например, после отправки нескольких действий одновременно, я надеюсь, что все действия с уникальными идентификаторами будут выполнены, но те, которые повторяют существующий и еще не завершенный идентификатор, отменит предыдущий запрос и займут его место.

ex:

store.dispatch({ type: "GET_SOME_DATA", uuid: "1" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "3" })
store.dispatch({ type: "GET_SOME_DATA", uuid: "2" })
// Get results back for '1', then '3', then '2' assuming equal response times.
// Only the duplicate uuid calls were cancelled, even though all have the same 'type'

Я пробовал использовать .distinctUntilChanged((a, b) => a.uuid === b.uuid) для фильтрации ввода потока в .switchMap, просто чтобы посмотреть, что произойдет, но это просто ограничивает, какие действия достигают switchMap, и поведение отмены всех, кроме самого последнего вызова API, связанного с действием GET_SOME_DATA, все еще происходит.

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .switchMap(({ uuid }) => // would be great if the switchMap would only cancel existing streams with same uuid
      ajax.getJSON(`/api/datastuff/${uuid}`)
        .map((data) => successAction(uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
        ))

На данный момент я использую mergeMap, но меня беспокоит, что это может вызвать такие проблемы, как я столкнулся с живым поиском, когда более старый запрос может разрешиться после самого последнего, в результате чего мое хранилище redux будет обновлено старыми данными поскольку mergeMap не отменяет наблюдаемые потоки, такие как switchMap ... есть ли способ просмотреть текущие запросы RxJS Ajax и отменить их с новым URL-адресом действия или лучшее решение, которое мне явно не хватает?

Ваше здоровье!

Изменить: мне интересно, будет ли изменение switchMap на mergeMap, а затем цепочка takeUntil и отмена других действий GET_SOME_DATA будет правильным подходом, или если это просто приведет к немедленной отмене всех запросов? Например

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .mergeMap(({ uuid }) =>
      ajax.getJSON(`/api/datastuff/${uuid}`)
        .takeUntil(
          action$.ofType(GET_SOME_DATA).filter(laterAction => laterAction.uuid === uuid)
        )
        .map((data) => successAction(uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
    ))

Edit2: По-видимому, добавление takeUntil работает! Я не уверен, что он на 100% постоянно растет с точки зрения уместности, но мне бы хотелось получить отзывы. Я также хотел бы поддержать возможность отмены вручную, будет ли метод, описанный здесь, подходящей реализацией?

Edit3: Я думаю, что это моя последняя рабочая версия. Удалена деструктуризация действия Redux в mergeMap на тот случай, если кто-то, кто новичок в redux-observables, столкнется с этим:

const getDataEpic = (action$) =>
  action$.ofType(GET_SOME_DATA)
    .mergeMap((action) =>
      ajax.getJSON(`/api/datastuff/${action.uuid}`)
        .takeUntil(Observable.merge(
          action$.ofType(MANUALLY_CANCEL_GETTING_DATA)
            .filter((cancelAction) => cancelAction.uuid === action.uuid),
          action$.ofType(GET_SOME_DATA)
            .filter((laterAction) => laterAction.uuid === action.uuid),
        ))
        .map((data) => successAction(action.uuid, data.values))
        .catch((err) => Observable.of(
          errorAction(action.uuid),
          setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
    ))

И наблюдаемое поведение сети от быстрого щелчка по всему, что находится в поле зрения. Были обработаны только недублированные запросы id!

введите здесь описание изображения


person Shawn Stern    schedule 21.02.2018    source источник


Ответы (1)


Вы также можете использовать оператор groupBy для обработки потоков с одним и тем же uuid и применить полезное поведение switchMap к каждому потоку действий uuid:

action$.ofType(GET_SOME_DATA)
.groupBy(
    ({ uuid }) => uuid, // group all the actions by uuid
    x => x,
    group$ => group$.switchMap(_ => Observable.timer(5000)) // close existing streams if no event of a grouped action is emitted 5 seconds in a row (prevents memory leaks)
)
.mergeMap(actionsGroupedByUuid$ => 
    actionsGroupedByUuid$.switchMap(({ uuid }) => 
        ajax.getJSON(`/api/datastuff/${uuid}`)
            .map((data) => successAction(uuid, data.values))
            .catch((err) => Observable.of(
                errorAction(uuid),
                setNotificationAction((err.xhr.response && err.xhr.response.message) || 'That went wrong'),
            )) 
    )
);
person ZahiC    schedule 21.02.2018
comment
Никогда не знал об операторе groupBy, но семантически выглядит как идеальное «правильное» решение для этого примера, спасибо! Не могли бы вы помочь мне понять, почему нам нужно выполнить mergeMap вокруг switchMap вместо того, чтобы сразу передавать вывод groupBy в switchMap? Я также наткнулся на это на github после поиска по запросу 'groupBy + switchMap', и он соответствует! github.com/ReactiveX/rxjs/issues/2147#issuecomment-262008911 - person Shawn Stern; 23.02.2018
comment
MergeMap применяется ко всем группам. Вы хотите, чтобы все группы (индивидуальный UUID) работали параллельно, а затем внутри каждой группы сохранялась только последняя. Если вы примените switchMap сразу после groupBy, вы получите только одну группу (один UUID) вместо всех. - person Oliver; 23.02.2018