Испускать элементы в RxJava с интервалом, который зависит от самого испускаемого элемента

В RxJava для Android я хочу генерировать элементы с интервалом, который зависит от самого элемента: в Observable я извлекаю элемент из очереди, обрабатываю его и отправляю. В зависимости от типа элемента, я хочу настроить, через какое время будет выдан следующий элемент (замедлить или ускорить интервал).

Следующий код, предложенный @ a.bertucci здесь Создавать объекты для рисования в пользовательском интерфейсе через равные промежутки времени с помощью RxJava на Android демонстрирует, как создавать элементы через равные промежутки времени.

private void drawPath(final String chars) {
    Observable.zip(
        Observable.create(new Observable.OnSubscribe<Path>() {
            // all the drawing stuff here
            ...
        }),
        Observable.timer(0, 50, TimeUnit.MILLISECONDS),
        new Func2<Path, Long, Path>() {
            @Override
            public Path call(Path path, Long aLong) {
                return path;
            }
        }
    )
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    ...
}

Теперь мой вопрос: можно ли вообще изменить частоту излучения, пока наблюдаемый объект излучает, и какая предпочтительная реализация с использованием RxJava.


person Oliver Hausler    schedule 21.10.2014    source источник


Ответы (2)


Ты можешь использовать

 public final <U, V> Observable<T> delay(
            Func0<? extends Observable<U>> subscriptionDelay,
            Func1<? super T, ? extends Observable<V>> itemDelay)

or

public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay)

Вы можете использовать itemDelay для управления скоростью.

person zsxwing    schedule 22.10.2014
comment
Разве это не сдвигает всю последовательность с определенной задержкой? Это не то, что мне нужно. Я не хочу менять последовательность. Я хочу, чтобы первый элемент был запущен немедленно. Затем (или до), испуская этот элемент, я хочу посмотреть на элемент, который я испускаю, и решить, когда должны быть отправлены следующие элементы (в зависимости от элемента). Затем выдается следующий элемент, и время снова изменяется для элемента, следующего за ним. - person Oliver Hausler; 22.10.2014

Согласно вашему комментарию, я думаю, вам следует создать для этого нового оператора.

Этот оператор примет функцию, которая вычислит задержку, чтобы применить к отправке вашего следующего элемента.

 Observable.range(1, 1000).lift(new ConfigurableDelay((item) -> 3 SECONDS)
                          .subscribe();

Вы можете попробовать что-то вроде этого:

public class ConfDelay {

public static void main(String[] args) {
    Observable.range(1, 1000).lift(new ConfigurableDelay(ConfDelay::delayPerItem, Schedulers.immediate()))
            .map((i) -> "|")
            .subscribe(System.out::print);
}


public static TimeConf delayPerItem(Object item) {
    long value = ((Integer) item).longValue();
    return new TimeConf(value * value, TimeUnit.MILLISECONDS);
}

private static class TimeConf {
    private final long time;
    private final TimeUnit unit;

    private TimeConf(final long time, final TimeUnit unit) {
        this.time = time;
        this.unit = unit;
    }
}

private static class ConfigurableDelay<T> implements Observable.Operator<T, T> {
    private final Func1<T, TimeConf> itemToTime;
    private final Scheduler scheduler;

    public ConfigurableDelay(final Func1<T, TimeConf> itemToTime) {
        this(itemToTime, Schedulers.computation());
    }

    public ConfigurableDelay(final Func1<T, TimeConf> itemToTime, final Scheduler scheulder) {
        this.itemToTime = itemToTime;
        this.scheduler = scheulder;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        return new Subscriber<T>(subscriber) {

            private TimeConf nextTime = null;

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(final Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final T t) {
                TimeConf previousNextTime = nextTime;
                this.nextTime = itemToTime.call(t);
                if (previousNextTime == null) {
                    subscriber.onNext(t);
                } else {
                    scheduler.createWorker().schedule(() -> subscriber.onNext(t), previousNextTime.time, previousNextTime.unit);
                }
            }
        };
    }
}
}
person dwursteisen    schedule 24.10.2014
comment
Я думаю, что мой код можно заменить zip / flatmap следующим образом: obs.map (e - ›new Pair (e, 3 SECONDS)) .flatMap ((conf) -› Observable.just (conf.event) .delay (время конф.)). подписка (); - person dwursteisen; 24.10.2014