RxJava: как интерактивно обрабатывать Flowable из консоли

Я создал Flowable (RxJava v3), который анализирует файл. Я бы хотел, чтобы он поддерживал обратное давление. Это важно, потому что файлы могут быть довольно большими, и я не хочу, чтобы они загружались в память сразу. Вот моя первая попытка:

public Flowable<Byte> asFlowable(InputStream is) {
    return Flowable.create(new FlowableOnSubscribe<Byte>() {

         @Override
         public void subscribe(FlowableEmitter<Byte> emitter) throws Exception {
             try (DataInputStream inputStream = new DataInputStream(is)){
                 if (inputStream.readInt() != SOME_CONSTANT) {
                     throw new IllegalArgumentException("illegal file format");
                 }

                 if (inputStream.readInt() != SOME_OTHER_CONSTANT) {
                     throw new IllegalArgumentException("illegal file format");
                 }

                 final int numItems = inputStream.readInt();

                 for(int i = 0; i < numItems; i++) {
                     if(emitter.isCancelled()) {
                         return;
                     }

                     emitter.onNext(inputStream.readByte());
                 }

                 emitter.onComplete();       
             } catch (Exception e) {
                 emitter.onError(e);
             } 
         }
     }, BackpressureStrategy.BUFFER);
}

Причина, по которой я использовал Flowable.create вместо Flowable.generate, заключается в том, что мне нужно проверить файл и выдать ошибки, если некоторые магические числа в начале файла неверны или не найдены. Это не очень хорошо сочетается с лямбда-выражениями Flowable.generate (но если вы знаете лучший способ, опубликуйте его).

Хорошо, давайте предположим, что холодный Flowable поддерживает противодавление. Теперь я хотел бы обработать его в консольном приложении.

Вопрос: я хочу запросить новый байт из Flowable и вывести его на консоль каждый раз, когда пользователь нажимает пробел (аналогично тому, что more или less делают в Linux). Как лучше всего это сделать? Я намерен наблюдать за потоком непосредственно в методе public static void main, так как мне нужно читать и писать с помощью консоли.

Я читал раздел Backpressure в RxJAva Wiki и нашел этот фрагмент:

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});

Но это смутило меня еще больше, так как метод request, похоже, не существует в RxJava 3.


person Mister Smith    schedule 20.11.2019    source источник
comment
Это generate. Существует обновленная вики для v2+.   -  person akarnokd    schedule 21.11.2019


Ответы (1)


Используйте generate, blockingSubscribe и прочитайте строку из консоли:

class State {
     DataInputStream inputStream;
     int count;
     int i;
}

BufferedReader bin = new BufferedReader(new InputStreamReader(System.in));

Flowable.generate(() -> {
    State s = new State();
    s.inputStream = new DataInputStream(is);
    try {
        if (s.inputStream.readInt() != SOME_CONSTANT) {
            throw new IllegalArgumentException("illegal file format");
        }

        if (s.inputStream.readInt() != SOME_OTHER_CONSTANT) {
            throw new IllegalArgumentException("illegal file format");
        }
        s.count = s.inputStream.readInt();
    } catch (IOException ex) {
        s.inputStream.close();
        throw ex;
    }
    return s;
}, (state, emitter) -> {
    if (state.i < s.count) {
        emitter.onNext(state.inputStream.readByte());
        s.i++;
    }
    if (state.i >= s.count) {
        emitter.onComplete();
    }
}, state -> {
    state.inputStream.close();
})
.subscribeOn(Schedulers.io())
.blockingSubscribe(b -> {
    System.out.println(b);
    bin.readLine();
}, Flowable.bufferSize());
person akarnokd    schedule 20.11.2019
comment
Но согласно документам blockingSubscribe будет блокироваться до тех пор, пока восходящий поток не завершится. Таким образом, все элементы будут сначала сгенерированы и сохранены в памяти, а затем возвращены потребителю. Как я уже сказал, файлы могут быть очень большими, я хотел бы анализировать их по требованию. - person Mister Smith; 21.11.2019
comment
Вы читаете неправильную документацию по методу. Прочтите this: противодавление: оператор потребляет исходный Flowable ограниченным образом (вплоть до буфераSize непогашенной суммы запроса для элементов). другими словами, generate будет генерировать столько элементов, сколько указано здесь bufferSize. - person akarnokd; 21.11.2019
comment
Я прочитал ссылку в вашем комментарии. В нем говорится: обратите внимание, что вызов этого метода заблокирует поток вызывающей стороны до тех пор, пока восходящий поток не завершится нормально или с ошибкой. Поэтому не рекомендуется вызывать этот метод из специальных потоков, таких как основной поток Android или поток отправки событий Swing. - person Mister Smith; 26.11.2019
comment
Вы заявили, что мне нужно будет наблюдать непосредственно в основном методе public static void. Нет другого способа оставаться в основном потоке Java, кроме блокировки. Вы разрабатываете для Android или разрабатываете приложение Swing? - person akarnokd; 26.11.2019
comment
Это настольное приложение Java. Да, я хочу блокировать, но я не хочу блокировать, пока восходящий поток не сгенерирует все элементы. Я хочу блокировать элемент за элементом. Прочитайте пользовательский ввод, дождитесь элемента потока, затем распечатайте. - person Mister Smith; 26.11.2019
comment
Он не будет блокироваться до тех пор, пока все элементы не будут заблокированы из-за параметра bufferSize. Установите его на 1, и вы получите один элемент, один считывается консолью, а затем генерируется следующий элемент. Вы хоть пробовали мое предложение? Если вы не хотите блокировать, используйте subscribe. Если вы хотите управлять шаблоном запроса, используйте DisposableSubscriber< /а>. - person akarnokd; 27.11.2019
comment
О, так что размер буфера 1 может помочь, интересно. DisposableSubscriber тоже хороший вариант. - person Mister Smith; 27.11.2019
comment
Ну, комбинация generateи blockingSubscribe с размером буфера 1 не сработала. Все элементы генерируются одновременно. Я отладил это. - person Mister Smith; 27.11.2019
comment
Работает на меня. Вы применяли какую-либо операцию между subscribeOn и blockingSubscribe? - person akarnokd; 27.11.2019
comment
Да, ты был прав. Я заархивировал Flowable другим идентичным Flowable, используя zipWith. По-видимому, обычная версия zipWith не оттачивает обратное давление, и вы должны использовать версию с 4 параметрами, указав размер буфера, равный 1. - person Mister Smith; 28.11.2019
comment
Неправильный. Прежде всего, обратное давление означает ограниченные буферы и объемы предварительной выборки, размер которых по умолчанию > 1. Во-вторых, эта перегрузка существует специально для того, чтобы позволить контролировать размер буфера и, следовательно, предварительную выборку из источников. - person akarnokd; 28.11.2019