RxJs испускают после задержки или следующего события в потоке

У меня есть пары событий: _1 _ / _ 2_ / etc и _3 _ / _ 4_ / etc. Я бы хотел следующее:

  • when an add1 is emitted on the stream
    • if DELAY transpires with no new add* emissions
      • emit remove1
    • if add* is emitted
      • emit remove1 for add1 immediately
      • испустить remove* для add* после DELAY

Это должно продолжаться для всех выбросов add* в потоке.

Вот тест, который я написал с использованием мраморного тестирования RxJS для этого случая:

import test from 'tape'
import { set, lensPath } from 'ramda'
import { TestScheduler } from 'rxjs/testing'
import hideAfterDelay from '../another/file'
import { actionCreators } from '../another/dir'

const prefix = 'epics -> notifications'

test(`${prefix} -> hideAfterDelay`, t => {
  t.plan(1)

  const scheduler = new TestScheduler(t.deepEqual)

  const actionMap = {
    a: createAddAction('hello!'),
    b: createAddAction('goodbye!'),
    x: actionCreators.notifications.remove('hello!'),
    y: actionCreators.notifications.remove('goodbye!')
  }

  scheduler.run(({ cold, expectObservable }) => {
    const actionStream = cold('a-------a-b-a------', actionMap)
    const expected =          '-----x-----x-y----x'
    const actual = hideAfterDelay(5)(actionStream)

    expectObservable(actual).toBe(expected, actionMap)
  })
})

function createAddAction (name) {
  const action = actionCreators.notifications.add(name)
  const lens = lensPath(['payload', 'id'])

  return set(lens, name, action)
}

Я думаю, что тест является репрезентативным для описанного выше поведения, которого я хочу.

Как я могу написать это наблюдаемое? Я пробовал использовать timer и race, но мне не удалось заставить это работать ...

Это эпопея с использованием redux-observable, кстати.

Использование RxJS v6


person neezer    schedule 30.04.2018    source источник


Ответы (1)


Хорошо, я думаю, что получил рабочее решение, использующее закрытие и немного изменив мое тестовое утверждение.

Во-первых, ожидаемая мраморная диаграмма должна выглядеть так

//   input:    a-------a-b-a------
// - expected: -----x-----x-y----x
// + expected: -----x----x-y----x
//
// Note above that the middle x and y emit at the same time as new
// `add*` actions on the source stream instead of one frame later

С этим небольшим изменением - которое все еще кажется совместимым с моим описанием в вопросе - я смог сдать тест со следующим:

import { of, timer, empty } from 'rxjs'
import { switchMap, mapTo, tap, merge } from 'rxjs/operators'
import { ofType } from '../operators'
import actionTypes from '../../actionTypes/notifications'
import { actionCreators } from '../..'

export default (delay = 3000) => actionStream => {
  let immediateRemove

  return actionStream.pipe(
    ofType(actionTypes.ADD),
    switchMap(action => {
      let obs = empty()

      if (immediateRemove) {
        obs = of(immediateRemove)
      }

      const remove = actionCreators.notifications.remove(action.payload.id)

      immediateRemove = remove

      return obs.pipe(
        merge(
          timer(delay).pipe(
            tap(() => {
              immediateRemove = null
            }),
            mapTo(remove)
          )
        )
      )
    })
  )
}

Я понятия не имею, лучший или правильный способ решить эту проблему, но уверен, что это правильный способ.

person neezer    schedule 01.05.2018