Я хотел бы асинхронно читать сообщения, созданные пользователем из stdin. Что-то вроде:
Flux.from(stdinPublisher())
.subscribe(msg -> System.out.println("Received: " + msg));
Так как же здесь реализовать такой stdin publisher?
Я хотел бы асинхронно читать сообщения, созданные пользователем из stdin. Что-то вроде:
Flux.from(stdinPublisher())
.subscribe(msg -> System.out.println("Received: " + msg));
Так как же здесь реализовать такой stdin publisher?
Это было легко. Извините за беспокойство :)
import java.util.Scanner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@Component
@Slf4j
public class StdinProducerExample implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
Flux
.create(sink -> {
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
})
.subscribeOn(Schedulers.newSingle("stdin publisher"))
.subscribe(m -> log.info("User message: {}", m));
log.info("Started listening stdin");
}
}
Другой способ получения данных с помощью Reactor - Processors
.
FluxProcessor
приемники безопасно закрывают многопоточные производители и могут использоваться приложениями, которые одновременно генерируют данные из нескольких потоков. Например, вы можете создать поточно-ориентированный сериализованный приемник дляUnicastProcessor
. Несколько потоков-производителей могут одновременно генерировать данные в следующем сериализованном приемнике:
public class FluxProcessorSample {
public static void main(String[] args) {
FluxProcessor<String, String> processor = UnicastProcessor.<String>create().serialize();
FluxSink<String> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
// Print input to STDOUT
Executors.newSingleThreadScheduledExecutor()
.execute(() -> processor
.publishOn(Schedulers.elastic())
.map(str -> "1>> " + str)
.subscribe(System.out::println));
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
sink.next(scanner.nextLine());
}
}
}
UnicastProcessor
может справиться с противодавлением с помощью внутреннего буфера. Компромисс в том, что у него может быть не более одного Subscriber
. Если вы проталкиваете через него какой-либо объем данных, пока его подписчик еще не запросил данные, он буферизует все данные.
Другие FluxProcessor
реализации:
DirectProcessor
- Может отправлять сигналы на ноль или более Subscribers
. У него есть ограничение на отсутствие противодавления.
EmitterProcessor
- Может передавать несколько подписчиков, соблюдая противодавление для каждого из своих подписчиков. Когда у него нет подписчика, он все еще может принимать несколько запросов данных до настраиваемого bufferSize
.
stdinPublisher
метода? - person Stav Alfi   schedule 05.03.2018stdinPublisher
метода или какой-то другой подход, неважно. - person Bullet-tooth   schedule 05.03.2018