Rxswift map + concat параллельно

Этот Observable выполняет следующие

  • Учитывая наблюдаемый источник
  • мы используем карту для выполнения некоторой асинхронной работы
  • мы используем concat, чтобы вернуть результат асинхронной работы, чтобы

Следующее возвращает желаемый результат, но я хотел бы начать асинхронную работу параллельно.

Какой правильный способ сделать это с Rx?

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    return doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    )
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }

Эта продукция

//start 1 - wait 4
// => 1
//start 2 - wait 3
// => 2
//start 3 - wait 2
// => 3
//start 4 - wait 1
// => 4
//start 5 - wait 0
// => 5

Желаемый результат будет

//start 1 - wait 4
//start 2 - wait 3
//start 3 - wait 2
//start 4 - wait 1
//start 5 - wait 0
// => 1
// => 2
// => 3
// => 4
// => 5

person Pierre    schedule 10.06.2016    source источник
comment
Вы говорите, что хотите запустить Observables параллельно, но ваш желаемый результат, похоже, не согласуется. Если все они запускаются без задержки, то не должен ли порядок быть 5, 4, 3, 2, 1, поскольку 5 ждет 0 секунд, а 4 ждет 1 секунду и т. Д.?   -  person solidcell    schedule 12.06.2016
comment
Если вы обновите свое сообщение, добавив в него тег swift, ваш вопрос и ответы будут выделены синтаксисом.   -  person solidcell    schedule 12.06.2016
comment
Извините, @solidcell, вопрос был довольно расплывчатым, я отредактировал, надеюсь, теперь он стал яснее. Моей целью было вернуть результат асинхронной работы по порядку, но запускать каждую асинхронную работу параллельно. Если соблюдение порядка не имело значения, использование плоской карты, как вы предлагали, сработало бы отлично   -  person Pierre    schedule 12.06.2016


Ответы (4)


Кажется, это работает, но не уверен, что это лучший ответ

import RxSwift

func delay(time: Int, closure: () -> Void) {
  dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))),
    dispatch_get_main_queue(), closure)
}

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> {
  return Observable.create() { (observer) -> Disposable in
    print(desc)
    delay(time) {
      observer.onNext(value)
      observer.onCompleted()
    }
    return NopDisposable.instance
  }
}

let seq = Observable
  .of(1, 2, 3, 4, 5)
  .map { (n) -> Observable<Int> in
    let o = doAsyncWork(n,
      desc: "start \(n) - wait \(5 - n)",
      time: 6 - n
    ).shareReplay(1)
    o.subscribe()
    return o.asObservable()
  }
  .concat()

let sharedSeq = seq.shareReplay(0)
sharedSeq.subscribeNext { print("=> \($0)") }
sharedSeq.subscribeCompleted { print("=> completed") }
person Pierre    schedule 11.06.2016
comment
Да, это то, что я предлагаю вам сделать. Как вы узнали, код в вашем вопросе не создавал горячее наблюдаемое, а только холодное наблюдаемое. Итак, подписавшись сразу, наблюдаемое начнет свою работу. Другой вариант вашего решения - использовать Subject, который уже является горячим наблюдаемым. Тогда вам не понадобится o.subscribe. - person solidcell; 13.06.2016

Ваш "желаемый результат", кажется, не согласуется с вашим желанием иметь Observables запускаться "параллельно", но задерживать их элементы так, что "5" не имеет задержки, "4" имеет задержку в 1 секунду, "3" имеет 2 секунды. задержка и т. д.

Я думаю, вы ищете такой результат:

start 1 - wait 4
start 2 - wait 3
start 3 - wait 2
start 4 - wait 1
start 5 - wait 0
5
4
3
2
1

Вот что вы можете использовать для этого:

Observable.range(start: 1, count: 5)
    .flatMap { n -> Observable<Int> in
        let waitInterval = 5 - n
        print("start \(n) - wait \(waitInterval)")
        return Observable.just(n)
            .delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance)
    }
    .subscribeNext { i in
        print(i)
    }
    .addDisposableTo(disposeBag)

Если вы имели в виду что-то другое, вы, вероятно, могли бы легко настроить этот фрагмент для достижения своей цели.

person solidcell    schedule 12.06.2016

Это не поможет вам прямо сейчас, но, возможно, поможет другим в будущем.

Оператор, которого вы ищете, называется concatMap. Однако на данный момент его нет в RxSwift.

В настоящее время существует закрытый PR для этого здесь.

person solidcell    schedule 13.06.2016

Причина, по которой это не работает так, как вы ожидали, заключается в том, что concat подписывается на исходные наблюдаемые по одному, ожидая завершения первого, прежде чем он подпишется на второй, и так далее.

В RxJava есть concatEager, который делает то, что вы хотите - подписывайтесь на все источники с самого начала, сохраняя при этом порядок. Но, похоже, не в Swift.

Что вы можете сделать, так это заархивировать каждый элемент с его индексом, flatMap, отсортировать по индексу и разархивировать.

person Alexander Torstling    schedule 04.04.2017