RxJava flatMap и странное поведение обратного давления

При написании задания синхронизации данных с RxJava я обнаружил странное поведение, которое не могу объяснить. Я новичок в RxJava и буду признателен за помощь.

Вкратце, моя работа довольно проста. У меня есть список идентификаторов элементов, я вызываю веб-сервис, чтобы получить каждый элемент по идентификатору, выполнить некоторую обработку и выполнить несколько вызовов для отправки данных в БД. Загрузка данных происходит быстрее, чем сохранение данных, поэтому я столкнулся с ошибками OutOfMemory.

Мой код в значительной степени выглядит как «неудачный» тест, но затем, выполнив некоторый тест, я понял, что удаление строки:

flatMap(dt -> Observable.just(dt))

Сделай так, чтоб это работало. Неудачный результат теста ясно показывает, что неиспользованные элементы накапливаются, и это приводит к OutOfMemory. Выходные данные рабочего теста показывают, что производитель всегда будет ждать потребителя, поэтому это никогда не приведет к OutOfMemory.

public static class DataStore {
    public Integer myVal;
    public byte[] myBigData;

    public DataStore(Integer myVal) {
        this.myVal = myVal;
        this.myBigData = new byte[1000000];
    }
}

@Test
public void working() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

@Test
public void failing() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(dt -> Observable.just(dt))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
    return Observable.<DataStore>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(200); //Here I synchronous call WS to retrieve data
                s.onNext(new DataStore(value));
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(1000); //Here I synchronous call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

Чем объясняется такое поведение? Как я мог решить свой неудачный тест, не удаляя Observable.just(dt)), который в моем реальном случае является Observable.from(someListOfItme)


person benjamin.donze    schedule 10.02.2016    source источник


Ответы (1)


flatMap по умолчанию объединяет неограниченное количество источников, и, применив эту конкретную лямбду без параметра maxConcurrent, вы существенно освободили восходящий поток, который теперь может работать на полной скорости, подавляя внутренние буферы других операторов.

person akarnokd    schedule 10.02.2016
comment
Хорошо, я начинаю понимать. Так что я должен заменить его на flatMap(dt -> Observable.just(dt), SOME_PRODUCER_BUFFER_SIZE)? - person benjamin.donze; 10.02.2016
comment
Да, желательно MAX_CONCURRENT_LOAD - person akarnokd; 10.02.2016
comment
Да, но я думаю, что в моем случае мне было бы интересно иметь буфер производителя больше, чем максимальная одновременная нагрузка, чтобы, если иногда хранилище работало быстрее, у меня был буфер, чтобы каждый поток был занят, иначе потребителю придется ждать. Это имеет смысл, верно? - person benjamin.donze; 10.02.2016