Stream API и очереди: подписка на BlockingQueue в стиле потока

Допустим, у нас есть очередь

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

и какой-то другой поток помещает в него значения, тогда мы читаем это как

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

Как я могу перебирать эту очередь в потоковом стиле, сохраняя семантику, аналогичную приведенному выше коду.

Этот код проходит только через текущее состояние очереди:

queue.stream().forEach(e -> System.out.println(e));

person Mikhail Boyarsky    schedule 04.05.2014    source источник
comment
Это должно работать. Это правильный синтаксис. Что заставило тебя так сказать?   -  person Syam S    schedule 05.05.2014
comment
В качестве объяснения проблемы: код, использующий цикл while, не завершается и печатает каждую строку, помещенную в очередь. В приведенном ниже примере потока печатаются только те элементы, которые находились внутри очереди на момент создания потока, т. е. конечное число.   -  person C-Otto    schedule 10.03.2017


Ответы (3)


Я немного догадываюсь о том, что вы ожидаете, но, думаю, у меня есть хорошее предчувствие.

Поток очереди, как и перебор очереди, представляет текущее содержимое очереди. Когда итератор или поток достигает хвоста очереди, он не блокируется в ожидании добавления дополнительных элементов. В этот момент итератор или поток исчерпываются, и вычисление завершается.

Если вам нужен поток, состоящий из всех текущих и будущих элементов очереди, вы можете сделать что-то вроде этого:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(К сожалению, необходимость обрабатывать InterruptedException делает это довольно запутанным.)

Обратите внимание, что нет способа закрыть очередь, и Stream.generate не может остановить создание элементов, так что это фактически бесконечный поток. Единственный способ завершить его — использовать операцию короткого замыкания потока, такую ​​как findFirst.

person Stuart Marks    schedule 05.05.2014
comment
это хороший подход, но моя проблема с ним, как остановить работу потока? - person Bassem Reda Zohdy; 02.04.2015
comment
@BoboZohdy Я не уверен, о чем ты спрашиваешь. Возможно, вам следует задать отдельный вопрос и привести пример того, что вы пытаетесь сделать. - person Stuart Marks; 02.04.2015
comment
Ответ Spliterator ниже отвечает на этот вопрос намного лучше. - person Nathan Brown; 13.01.2016

Вы можете посмотреть на реализацию асинхронной очереди. Если у вас Java 8, то cyclops-react, я разработчик этого проекта, предоставляет асинхронный .Queue, который позволит вам как заполнять, так и потреблять из очереди асинхронно (и чисто).

e.g.

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

Или просто (если это com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

Потом в отдельной теме:

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

Вернувшись в основной поток, исходный код теперь должен работать так, как ожидалось (бесконечно перебирать сообщения, добавляемые в очередь, и распечатывать их).

queue.stream().forEach(e -> System.out.println(e));

Очередь и, следовательно, поток могут быть закрыты на любом этапе через -

queue.close();
person John McClean    schedule 10.03.2015
comment
Я попробовал этот пример, он не работает должным образом: основной поток не блокируется на forEach - person Laymain; 23.04.2018
comment
Привет @Laymain - спасибо, что попробовал! Какую версию ты используешь? Я протестировал приведенный выше код на последней версии (10.0.0-M7: скомпилируйте 'com.oath.cyclops:cyclops:10.0.0-M7') и блоки основного потока на forEach, как и ожидалось. Отдельный поток (возможно, тот, который отправляет данные в очередь, должен вызывать queue.close()). Надеюсь, у нас нет ошибки в одном из наших старых выпусков! - person John McClean; 23.04.2018
comment
Плохо, все работает, как и ожидалось, я закрывал очередь слишком рано... - person Laymain; 23.04.2018
comment
Какая глупая ошибка... Я пошел со своей собственной быстрой и грязной реализацией gist.github.com/ леймейн/9ce194bb2924f78582e71e350672bbef - person Laymain; 23.04.2018
comment
Круто - рад, что у вас все заработало (и что у нас нет ошибки!) - person John McClean; 23.04.2018

Другой подход заключается в создании собственного Spliterator. В моем случае у меня есть очередь блокировки, и я хочу создать поток, который продолжает извлекать элементы, пока блок не истечет. Сплитератор примерно такой:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

Исключение, созданное для обработки InterruptedException, является расширением RuntimeException. Используя этот класс, можно построить поток через: StreamSupport.stream(new QueueSpliterator(...)) и добавить обычные операции потока.

person user3787629    schedule 02.12.2015
comment
это можно упростить с помощью Spliterators.AbstractSpliterator - person cambunctious; 03.06.2019
comment
Поправьте меня, если я ошибаюсь (но это то, что я вижу). Проблема с этим заключается в том, что вы не можете завершить поток, когда закончите - он будет продолжать опрос в течение нескольких периодов ожидания перед завершением. - person Tvaroh; 19.11.2019