Как ввести задержку в группе RxJava, обрабатываемой параллельно?

Мой вариант использования — сгруппировать поток, запустить параллельную обработку некоторых групп и внутри каждой группы отложить обработку каждого элемента на постоянный интервал. Кажется, я не могу правильно понять задержку внутри группы, потому что вместо того, чтобы испускаться периодически, они испускаются почти мгновенно. Ниже приведен мой тестовый код с использованием RxJava 2.0.5:

@Slf4j
public class GroupsAndDelays {
    Function<Integer, Flowable<Integer>> remoteClient;
    int groupMemberDelaySeconds;
    int remoteCallTimeoutSeconds;
    int maxRetryCount;
    int retryDelaySeconds;
    Map<Long, List<Integer>> threadMap;
    Map<Long, List<Integer>> resultThreadMap;

    public ParallelFlowable<Integer> doStuff(Flowable<Integer> src,
                                             Function<Integer, Integer> groupByFn,
                                             Function<Integer, Flowable<Integer>> responseMapper) {
        return src
                .groupBy(groupByFn)
                .parallel(5).runOn(Schedulers.newThread())
                .map(g -> g.distinct().toList())
                .flatMap(i -> i.toFlowable())
                .flatMap(i -> {
                    log.debug("Processing group: {}.", i);
                    return Flowable.fromIterable(i)
                            .delay(groupMemberDelaySeconds, SECONDS);
                })
                .flatMap(i -> {
                    log.debug("Processing: {}.", i);
                    putInThreadMap(threadMap, i);
                    return remoteCall(i * 2, responseMapper);
                });
    }

    private Flowable<Integer> remoteCall(int i, Function<Integer, Flowable<Integer>> responseMapper) throws
            Exception {
        return remoteClient.apply(i)
                .timeout(remoteCallTimeoutSeconds, SECONDS)
                .retryWhen(t -> t.zipWith(Flowable.range(1, maxRetryCount), (ex, retryCount) -> retryCount)
                        .flatMap(retryCount -> Flowable.timer(retryCount * retryDelaySeconds, SECONDS)))
                .flatMap(result -> {
                    log.debug("Processing result: {}.", result);
                    putInThreadMap(resultThreadMap, result);
                    return responseMapper.apply(result);
                })
                .onErrorReturnItem(-1);
    }

    private void putInThreadMap(Map<Long, List<Integer>> map, int i) {
        map.merge(Thread.currentThread().getId(), singletonList(i), this::merge);
    }

    private List<Integer> merge(List<Integer> a, List<Integer> b) {
        return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList());
    }
}

Вот тест Спока:

class GroupsAndDelaysSpec extends Specification {
    final int groupMemberDelaySeconds = 3
    final int remoteCallTimeoutSeconds = 3
    final int maxRetryCount = 2
    final int retryDelaySeconds = 2
    Function<Integer, Flowable<Integer>> remoteClient
    Function<Integer, Integer> groupByFn
    Function<Integer, Flowable<Integer>> responseMapper

    GroupsAndDelays groupsAndDelays

    final Flowable<Integer> src = Flowable.fromArray(
            1, 2, 3, 4, 5, 1, 2, 3, 4, 5,
            11, 12, 13, 14, 15, 11, 12, 13, 14, 15,
            21, 22, 23, 24, 25, 21, 22, 23, 24, 25,
            31, 32, 33, 34, 35, 31, 32, 33, 34, 35,
            41, 42, 43, 44, 45, 41, 42, 43, 44, 45
    )

    def setup() {
        remoteClient = Mock(Function)

        groupsAndDelays = new GroupsAndDelays()
        groupsAndDelays.groupMemberDelaySeconds = groupMemberDelaySeconds
        groupsAndDelays.remoteCallTimeoutSeconds = remoteCallTimeoutSeconds
        groupsAndDelays.maxRetryCount = maxRetryCount
        groupsAndDelays.retryDelaySeconds = retryDelaySeconds
        groupsAndDelays.remoteClient = remoteClient
        groupsAndDelays.threadMap = new ConcurrentHashMap<Long, List<Integer>>()
        groupsAndDelays.resultThreadMap = new ConcurrentHashMap<Long, List<Integer>>()

        groupByFn = Mock(Function)
        groupByFn.apply(_) >> { args -> args[0] % 10 }

        responseMapper = Mock(Function)
        responseMapper.apply(_) >> { args -> args[0] }
    }

    def cleanup() {
        println("Thread map: ${groupsAndDelays.threadMap}")
        println("Result thread map: ${groupsAndDelays.resultThreadMap}")

        assert groupsAndDelays.threadMap.size() == 5
        assert groupsAndDelays.threadMap.findAll { k, v -> v.size() == 5 }.size() == 5
    }

    def "each group executes on a separate thread"() {
        setup:
        remoteClient.apply(_) >> { args -> Flowable.just(args[0]) }

        when:
        groupsAndDelays.doStuff(src, groupByFn, responseMapper)
                .sequential()
                .toList()
                .blockingGet()

        then:
        true
    }
}

Пример запуска:

2017-02-04 00:49:19.430 [RxNewThreadScheduler-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [3, 13, 23, 33, 43].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [1, 11, 21, 31, 41].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [5, 15, 25, 35, 45].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [2, 12, 22, 32, 42].
2017-02-04 00:49:19.430 [RxNewThreadScheduler-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [4, 14, 24, 34, 44].
2017-02-04 00:49:22.443 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 2.
2017-02-04 00:49:22.443 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 1.
2017-02-04 00:49:22.443 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 5.
2017-02-04 00:49:22.443 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 4.
2017-02-04 00:49:22.443 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 3.
2017-02-04 00:49:22.456 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 10.
2017-02-04 00:49:22.456 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 2.
2017-02-04 00:49:22.456 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 8.
2017-02-04 00:49:22.456 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 6.
2017-02-04 00:49:22.456 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 4.
2017-02-04 00:49:22.459 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 13.
2017-02-04 00:49:22.459 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 14.
2017-02-04 00:49:22.459 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 11.
2017-02-04 00:49:22.459 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 15.
2017-02-04 00:49:22.459 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 12.
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 26.
2017-02-04 00:49:22.466 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 30.
2017-02-04 00:49:22.466 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 24.
2017-02-04 00:49:22.466 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 22.
2017-02-04 00:49:22.466 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 28.
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 23.
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 25.
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 22.
2017-02-04 00:49:22.467 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 21.
2017-02-04 00:49:22.467 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 24.
2017-02-04 00:49:22.467 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 46.
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 50.
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 44.
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 42.
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 48.
2017-02-04 00:49:22.468 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 33.
2017-02-04 00:49:22.468 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 35.
2017-02-04 00:49:22.468 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 32.
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 31.
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 34.
2017-02-04 00:49:22.469 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 66.
2017-02-04 00:49:22.469 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 62.
2017-02-04 00:49:22.469 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 68.
2017-02-04 00:49:22.469 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 64.
2017-02-04 00:49:22.469 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 70.
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 43.
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 44.
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 41.
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 42.
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 45.
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 86.
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 88.
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 82.
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 84.
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 90.
Thread map: [20:[3, 13, 23, 33, 43], 21:[2, 12, 22, 32, 42], 22:[5, 15, 25, 35, 45], 23:[4, 14, 24, 34, 44], 24:[1, 11, 21, 31, 41]]
Result thread map: [20:[6, 26, 46, 66, 86], 21:[4, 24, 44, 64, 84], 22:[10, 30, 50, 70, 90], 23:[8, 28, 48, 68, 88], 24:[2, 22, 42, 62, 82]]

Process finished with exit code 0

Изменить:

Бонусные баллы, если вы также можете показать, как это сделать в project Reactor.

Редактировать 2: решение с использованием проекта Reactor находится здесь.


person Abhijit Sarkar    schedule 04.02.2017    source источник


Ответы (3)


Библиотека RxJava 2 Extensions содержит spanout оператор.

Замените delay() на

compose(FlowableTransformers.spanout(
    groupMemberDelaySeconds, groupMemberDelaySeconds, SECONDS))
person akarnokd    schedule 04.02.2017
comment
Почему вы говорите, что Давид? Я также пробую проект Reactor (см. редактирование). - person Abhijit Sarkar; 05.02.2017
comment
Как вы думаете, кто сделал большую часть RxJava 2 и значительную часть Reactor 3? - person akarnokd; 05.02.2017
comment
Поверьте мне, я понимаю чувства, был там, сделал это. Я не умаляю вашего вклада в эти проекты. Но я бы хотел, чтобы больше людей интересовались поддержкой кода, а не один человек становился узким местом. Разве разработка RxJava1 не замедлилась после ухода Бена Кристенсена? - person Abhijit Sarkar; 05.02.2017
comment
В RxJava не было узких мест в разработке, только управленческие до того, как я возглавил ее в июне прошлого года. - person akarnokd; 05.02.2017
comment
Я думаю, вы ищете оператора Flux#delayElements в реакторе? (3.0.5.BUILD-SNAPSHOT, раньше вызывалась просто задержка в 3.0.4 ‹) - person smaldini; 05.02.2017
comment
@smaldini У меня большие проблемы с его работой. Есть ли у Project Reactor канал Gitter для обсуждений? - person Abhijit Sarkar; 06.02.2017

Я предполагаю, что вы хотите вставить задержки между выбросами из итерации, которая возвращается в этом flatMap:

.flatMap(i -> {
   log.debug("Processing group: {}.", i);
       return Flowable.fromIterable(i)
           .delay(groupMemberDelaySeconds, SECONDS);
})

В этом случае вы неверно истолковали оператор delay. Он просто сдвигает выбросы на указанное время. Чтобы вставить задержку между каждым выбросом, вы можете заархивировать ее с помощью interval observable

.flatMap(i -> {
   log.debug("Processing group: {}.", i);
       return Flowable.fromIterable(i)
           .zipWith(Flowable.interval(groupMemberDelaySeconds, SECONDS), (item, time) -> item)
})

Однако вы должны понимать, что этот подход действителен только тогда, когда вы можете быть уверены, что ваш наблюдаемый объект всегда производить чаще, чем указанный интервал, в противном случае вы можете в конечном итоге буферизовать выбросы из интервала, и это будет означать мгновенный выброс из желаемого наблюдаемого, как только они поступят для следующих нескольких элементов, в зависимости от количества событий, которые буферизуются из интервал наблюдаемый. Конечно, есть способы обойти это, но этот намного проще, и когда вы работаете с Iterable, вы можете быть уверены (в пределах разумного), что этого не произойдет.

person koperko    schedule 04.02.2017
comment
Вы, наверное, правы, я изменил свой комментарий. С тех пор я перешел на использование проекта Reactor вместо RxJava2 и не смог разместить ваш комментарий с кодом. Я отвечу после того, как попробовал это. Я добавил редактирование в свой вопрос, спрашивая, как это сделать в проекте Reactor. - person Abhijit Sarkar; 04.02.2017

Вы можете попробовать код ниже. Ключевым моментом является использование zipWith в сочетании с interval, что обеспечивает выпуск всех элементов в зависимости от времени.

public static void main(String[] args) {
    Observable<Integer> o1 = Observable.range(1, 3);
    System.out.println("Without delay");
    o1.subscribe(v -> System.out.println(v));

    System.out.println("With delay");
    o1.zipWith(Observable.interval(1, TimeUnit.SECONDS), (a, b)->a ).subscribe(a->System.out.println(a));
    Observable.timer(5, TimeUnit.SECONDS).toBlocking().subscribe();
}
person Pavan Kumar    schedule 04.02.2017
comment
Где groupBy и параллельная обработка их групп? - person Abhijit Sarkar; 04.02.2017
comment
Ну, это не полное решение, а пример реализации, чтобы ввести нужную вам задержку. Исходя из этого, должно быть адаптировано полное решение проблемы. - person Pavan Kumar; 05.02.2017