RxJava кэширует последний элемент для будущих подписчиков

Я реализовал простой RxEventBus, который начинает генерировать события, даже если нет подписчиков. Я хочу кэшировать последнее отправленное событие, чтобы, если первый/следующий подписчик подписывается, он получал только один (последний) элемент.

Я создал тестовый класс, который описывает мою проблему:

public class RxBus {

ApplicationsRxEventBus applicationsRxEventBus;

public RxBus() {
    applicationsRxEventBus = new ApplicationsRxEventBus();
}

public static void main(String[] args) {
    RxBus rxBus = new RxBus();
    rxBus.start();
}

private void start() {
    ExecutorService executorService = Executors.newScheduledThreadPool(2);

    Runnable runnable0 = () -> {
        while (true) {
            long currentTime = System.currentTimeMillis();
            System.out.println("emiting: " + currentTime);
            applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };

    Runnable runnable1 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 1: " + applicationsEvent.number);
                }
            });

    Runnable runnable2 = () -> applicationsRxEventBus
            .getBus()
            .subscribe(new Subscriber<ApplicationsEvent>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(ApplicationsEvent applicationsEvent) {
                    System.out.println("runnable 2: " + applicationsEvent.number);
                }
            });


    executorService.execute(runnable0);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable1);
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executorService.execute(runnable2);
}

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus.cache();
    }

    public void emit(ApplicationsEvent event) {
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable;
    }
}
private class ApplicationsEvent {
    long number;

    public ApplicationsEvent(long number) {
        this.number = number;
    }
}
}

runnable0 генерирует события, даже если нет подписчиков. runnable1 подписывается через 3 секунды и получает последний элемент (и это нормально). Но runnable2 подписывается через 3 секунды после runnable1 и получает все элементы, которые получил runnable1. Мне нужен только последний элемент для runnable2. Я пробовал кэшировать события в RxBus:

private class ApplicationsRxEventBus {
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
    private final Observable<ApplicationsEvent> mBusObservable;

    private ApplicationsEvent event;

    public ApplicationsRxEventBus() {
        mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
        mBusObservable = mRxBus;
    }

    public void emit(ApplicationsEvent event) {
        this.event = event;
        mRxBus.onNext(event);
    }

    public Observable<ApplicationsEvent> getBus() {
        return mBusObservable.doOnSubscribe(() -> emit(event));
    }
}

Но проблема в том, что когда runnable2 подписывается, runnable1 получает событие дважды:

emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627

Я уверен, что для этого есть оператор RxJava. Как этого добиться?


person Bresiu    schedule 10.11.2015    source источник


Ответы (2)


Ваш ApplicationsRxEventBus выполняет дополнительную работу, повторно отправляя сохраненное событие всякий раз, когда вы подписываетесь, в дополнение ко всем кэшированным событиям.

Вам нужен только один BehaviorSubject + toSerialized, так как он будет удерживать самое последнее событие и самостоятельно пересылать его подписчикам.

person akarnokd    schedule 10.11.2015
comment
Это решило мою проблему. Я также читал о «ReplaySubject» с «createWithSize (1)», кажется, что оба дают одинаковые результаты. - person Bresiu; 11.11.2015

Вы используете неправильный интерфейс. Когда вы подписываетесь на холодный Observable, вы получаете все его события. Сначала вам нужно превратить его в hot Observable. Это делается путем создания ConnectableObservable из вашего Observable с использованием его publish. Затем ваши наблюдатели вызывают connect, чтобы начать получать события.

Вы также можете узнать больше об этом в разделе Hot AndColdObservables. учебника.

person Thomas    schedule 10.11.2015
comment
Что если Observable не знает о количестве подписчиков и времени их подписки. И ситуация, когда Observable генерит событие, когда подписчиков нет, и после этого подписывается один Subscriber. Он должен получить последнее событие. Я бы хотел, чтобы RxBus был разделен. Я читал о горячих и холодных наблюдаемых. Я нашел ReplaySubject. @akarnokd BehaviourSubject просто делает свое дело. - person Bresiu; 11.11.2015
comment
Я бы все же сделал шину ConnectableObservable. К этому вы можете подключить ReplaySubject размером 1 для кэширования последнего события, как вы сказали. - person Thomas; 12.11.2015