Java Reactor: как создать поток из стандартного ввода?

Я хотел бы асинхронно читать сообщения, созданные пользователем из stdin. Что-то вроде:

Flux.from(stdinPublisher()) 
    .subscribe(msg -> System.out.println("Received: " + msg));

Так как же здесь реализовать такой stdin publisher?


person Bullet-tooth    schedule 05.03.2018    source источник
comment
исходный код stdinPublisher метода?   -  person Stav Alfi    schedule 05.03.2018
comment
да. Я хочу подписаться и получать сообщения, созданные пользователем. Это может быть реализация stdinPublisher метода или какой-то другой подход, неважно.   -  person Bullet-tooth    schedule 05.03.2018


Ответы (2)


Это было легко. Извините за беспокойство :)

import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {

  @Override
  public void run(ApplicationArguments args) throws Exception {
    Flux
        .create(sink -> {
          Scanner scanner = new Scanner(System.in);
          while (scanner.hasNext()) {
            sink.next(scanner.nextLine());
          }
        })
        .subscribeOn(Schedulers.newSingle("stdin publisher"))
        .subscribe(m -> log.info("User message: {}", m));
    log.info("Started listening stdin");
  }

}
person Bullet-tooth    schedule 05.03.2018

Другой способ получения данных с помощью Reactor - Processors.

FluxProcessor приемники безопасно закрывают многопоточные производители и могут использоваться приложениями, которые одновременно генерируют данные из нескольких потоков. Например, вы можете создать поточно-ориентированный сериализованный приемник для UnicastProcessor. Несколько потоков-производителей могут одновременно генерировать данные в следующем сериализованном приемнике:

public class FluxProcessorSample {

  public static void main(String[] args) {    
    FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
    FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);

    // Print input to STDOUT
    Executors.newSingleThreadScheduledExecutor()
        .execute(() -> processor
            .publishOn(Schedulers.elastic())
            .map(str -> "1>> " + str)
            .subscribe(System.out::println));

    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNext()) {
      sink.next(scanner.nextLine());
    }
  }
}

UnicastProcessor может справиться с противодавлением с помощью внутреннего буфера. Компромисс в том, что у него может быть не более одного Subscriber. Если вы проталкиваете через него какой-либо объем данных, пока его подписчик еще не запросил данные, он буферизует все данные.

Другие FluxProcessor реализации:

  • DirectProcessor - Может отправлять сигналы на ноль или более Subscribers. У него есть ограничение на отсутствие противодавления.

  • EmitterProcessor - Может передавать несколько подписчиков, соблюдая противодавление для каждого из своих подписчиков. Когда у него нет подписчика, он все еще может принимать несколько запросов данных до настраиваемого bufferSize.

person Evgeniy Khyst    schedule 15.11.2019