При написании задания синхронизации данных с RxJava я обнаружил странное поведение, которое не могу объяснить. Я новичок в RxJava и буду признателен за помощь.
Вкратце, моя работа довольно проста. У меня есть список идентификаторов элементов, я вызываю веб-сервис, чтобы получить каждый элемент по идентификатору, выполнить некоторую обработку и выполнить несколько вызовов для отправки данных в БД. Загрузка данных происходит быстрее, чем сохранение данных, поэтому я столкнулся с ошибками OutOfMemory.
Мой код в значительной степени выглядит как «неудачный» тест, но затем, выполнив некоторый тест, я понял, что удаление строки:
flatMap(dt -> Observable.just(dt))
Сделай так, чтоб это работало. Неудачный результат теста ясно показывает, что неиспользованные элементы накапливаются, и это приводит к OutOfMemory. Выходные данные рабочего теста показывают, что производитель всегда будет ждать потребителя, поэтому это никогда не приведет к OutOfMemory.
public static class DataStore {
public Integer myVal;
public byte[] myBigData;
public DataStore(Integer myVal) {
this.myVal = myVal;
this.myBigData = new byte[1000000];
}
}
@Test
public void working() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
@Test
public void failing() {
int MAX_CONCURRENT_LOAD = 1;
int MAX_CONCURRENT_STORE = 2;
AtomicInteger nbUnconsumed = new AtomicInteger(0);
List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
Observable.from(ids)
.flatMap(this::produce, MAX_CONCURRENT_LOAD)
.doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
.flatMap(dt -> Observable.just(dt))
.flatMap(this::consume, MAX_CONCURRENT_STORE)
.doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
.toBlocking().forEach(s -> {});
logger.info("Finished");
}
private Observable<DataStore> produce(final int value) {
return Observable.<DataStore>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(200); //Here I synchronous call WS to retrieve data
s.onNext(new DataStore(value));
s.onCompleted();
}
} catch (Exception e) {
s.onError(e);
}
}).subscribeOn(Schedulers.io());
}
private Observable<Boolean> consume(DataStore value) {
return Observable.<Boolean>create(s -> {
try {
if (!s.isUnsubscribed()) {
Thread.sleep(1000); //Here I synchronous call DB to store data
s.onNext(true);
s.onCompleted();
}
} catch (Exception e) {
s.onNext(false);
s.onCompleted();
}
}).subscribeOn(Schedulers.io());
}
Чем объясняется такое поведение? Как я мог решить свой неудачный тест, не удаляя Observable.just(dt)), который в моем реальном случае является Observable.from(someListOfItme)