Rx: Как получить последний элемент, даже если был вызван onError?

Я использую RxJava, и мне нужно сделать 2 вещи:

  • Получить последний элемент, испускаемый из Observable
  • Определите, был ли вызван onError или onCompleted

Я рассматривал использование last и lastOrDefault (на самом деле это то поведение, которое мне нужно), но мне не удалось обойти onError, скрывающий последний элемент. Я был бы в порядке, если бы Observable использовался дважды, один раз для получения значения last и один раз для получения статуса завершения, но до сих пор я смог сделать это, только создав свой собственный Observer:

public class CacheLastObserver<T> implements Observer<T> {

    private final AtomicReference<T> lastMessageReceived = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    @Override
    public void onCompleted() {
        // Do nothing
    }

    @Override
    public void onError(Throwable e) {
        error.set(e);
    }

    @Override
    public void onNext(T message) {
        lastMessageReceived.set(message);
    }

    public Optional<T> getLastMessageReceived() {
        return Optional.ofNullable(lastMessageReceived.get());
    }

    public Optional<Throwable> getError() {
        return Optional.ofNullable(error.get());
    }
}

У меня нет проблем с созданием собственного Observer, но мне кажется, что Rx должен лучше соответствовать этому варианту использования «получить последний элемент, испускаемый до завершения». Любые идеи о том, как это сделать?


person cdeszaq    schedule 09.06.2016    source источник


Ответы (4)


Попробуй это:

source.materialize().buffer(2).last()

В случае ошибки последним выпуском будет список из двух элементов: последнее переданное значение, завернутое в виде Notification, и уведомление об ошибке. Без ошибки вторым пунктом будет уведомление о завершении.

Также обратите внимание, что если значение не выдается, результатом будет список из одного элемента, являющегося уведомлением терминала.

person Dave Moten    schedule 09.06.2016
comment
Есть ли способ получить последний отправленный элемент, а не последний полученный, это хороший подход, хотя и для получения последнего элемента при вызове onNext, но не из исходного Observable. (т.е. выброс, 1,2,3, сбой перед 4, тогда у меня должен быть сбой и элемент 4, в настоящее время вы получите 3 с помощью этого метода). - person desgraci; 15.11.2016
comment
для источника, выдающего 1,2,3, error, 4, противоречит контракту Observable и, следовательно, не поддерживается RxJava. Вам нужно будет подавить выдачу ошибки и выдать ее как эмиссию onNext, чтобы это произошло. - person Dave Moten; 15.11.2016
comment
Это не работает с наблюдаемыми с нечетным количеством элементов (уведомление о завершении/ошибке находится в одноэлементном списке). Даже source.materialize().buffer(2, 1).last() не решает, потому что последний буфер будет иметь длину = 1. - person maurocchi; 09.10.2017

Я решил с:

source.materialize().withPrevious().last()

где withPrevious (Котлин):

fun <T> Observable<T>.withPrevious(): Observable<Pair<T?, T>> =
    this.scan(Pair<T?, T?>(null, null)) { previous, current -> Pair(previous.second, current) }
        .skip(1)
        .map { it as Pair<T?, T> }
person maurocchi    schedule 09.10.2017

Пробовали ли вы onErrorResumeДалее здесь вы можете увидеть остальные операторы или операторы обработки ошибок https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

person paul    schedule 10.06.2016

Я использовал этот подход для решения вашей проблемы.

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        Observable.just(1, 2, 3, 4, 5)
                .map(number -> {
                    if (number == 4)
                        throw new NullPointerException();
                    else
                        return number;
                })
                .onErrorResumeNext(t -> Observable.empty())
                .lastOrDefault(15)
                .subscribe(lastEmittedNumber -> System.out.println("onNext: " + lastEmittedNumber));
    }
}

Он будет излучать onNext: 3

Надеюсь, что это поможет.

person Rodrigo Henriques    schedule 10.06.2016