Я создал Flowable (RxJava v3), который анализирует файл. Я бы хотел, чтобы он поддерживал обратное давление. Это важно, потому что файлы могут быть довольно большими, и я не хочу, чтобы они загружались в память сразу. Вот моя первая попытка:
public Flowable<Byte> asFlowable(InputStream is) {
return Flowable.create(new FlowableOnSubscribe<Byte>() {
@Override
public void subscribe(FlowableEmitter<Byte> emitter) throws Exception {
try (DataInputStream inputStream = new DataInputStream(is)){
if (inputStream.readInt() != SOME_CONSTANT) {
throw new IllegalArgumentException("illegal file format");
}
if (inputStream.readInt() != SOME_OTHER_CONSTANT) {
throw new IllegalArgumentException("illegal file format");
}
final int numItems = inputStream.readInt();
for(int i = 0; i < numItems; i++) {
if(emitter.isCancelled()) {
return;
}
emitter.onNext(inputStream.readByte());
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
Причина, по которой я использовал Flowable.create
вместо Flowable.generate
, заключается в том, что мне нужно проверить файл и выдать ошибки, если некоторые магические числа в начале файла неверны или не найдены. Это не очень хорошо сочетается с лямбда-выражениями Flowable.generate
(но если вы знаете лучший способ, опубликуйте его).
Хорошо, давайте предположим, что холодный Flowable поддерживает противодавление. Теперь я хотел бы обработать его в консольном приложении.
Вопрос: я хочу запросить новый байт из Flowable и вывести его на консоль каждый раз, когда пользователь нажимает пробел (аналогично тому, что more
или less
делают в Linux). Как лучше всего это сделать? Я намерен наблюдать за потоком непосредственно в методе public static void main
, так как мне нужно читать и писать с помощью консоли.
Я читал раздел Backpressure в RxJAva Wiki и нашел этот фрагмент:
someObservable.subscribe(new Subscriber<t>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onCompleted() {
// gracefully handle sequence-complete
}
@Override
public void onError(Throwable e) {
// gracefully handle error
}
@Override
public void onNext(t n) {
// do something with the emitted item "n"
// request another item:
request(1);
}
});
Но это смутило меня еще больше, так как метод request
, похоже, не существует в RxJava 3.
generate
. Существует обновленная вики для v2+. - person akarnokd   schedule 21.11.2019