Как обрабатывать фрагменты файла с помощью java.util.stream

Чтобы познакомиться с потоковым API, я попытался написать довольно простой шаблон.

Проблема. Текстовый файл содержит не вложенные блоки текста. Все блоки идентифицируются начальными/конечными шаблонами (например, <start> и <stop>. Содержимое блока синтаксически неотличимо от шума между блоками. Поэтому невозможно работать с простыми (без сохранения состояния) лямбда-выражениями.

Я просто смог реализовать что-то уродливое вроде:
Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());
Честно говоря, это не то, чего я хочу.

Я ищу картографа, например:
Files.lines(path).map(MyMapAllLinesOfBlockToBuckets()).parallelStream().collect(new MyProcessOneBucketCollector<>());

есть хороший способ извлечения фрагментов данных из потока Java 8, похоже, содержит скелет решения. К сожалению, я туплю, чтобы перевести это на мою проблему. ;-)

Любые подсказки?


person Uwe    schedule 20.10.2014    source источник


Ответы (1)


Вот решение, которое можно использовать для преобразования Stream<String>, где каждый элемент представляет строку, в Stream<List<String>>, где каждый элемент представляет фрагмент, найденный с использованием указанного разделителя:

public class ChunkSpliterator implements Spliterator<List<String>> {
    private final Spliterator<String> source;
    private final Predicate<String> start, end;
    private final Consumer<String> getChunk;
    private List<String> current;

    ChunkSpliterator(Spliterator<String> lineSpliterator,
        Predicate<String> chunkStart, Predicate<String> chunkEnd) {
        source=lineSpliterator;
        start=chunkStart;
        end=chunkEnd;
        getChunk=s -> {
            if(current!=null) current.add(s);
            else if(start.test(s)) current=new ArrayList<>();
        };
    }
    public boolean tryAdvance(Consumer<? super List<String>> action) {
        while(current==null || current.isEmpty()
                            || !end.test(current.get(current.size()-1)))
            if(!source.tryAdvance(getChunk)) return false;
        current.remove(current.size()-1);
        action.accept(current);
        current=null;
        return true;
    }
    public Spliterator<List<String>> trySplit() {
        return null;
    }
    public long estimateSize() {
        return Long.MAX_VALUE;
    }
    public int characteristics() {
        return ORDERED|NONNULL;
    }

    public static Stream<List<String>> toChunks(Stream<String> lines,
        Predicate<String> chunkStart, Predicate<String> chunkEnd,
        boolean parallel) {

        return StreamSupport.stream(
            new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd),
            parallel);
    }
}

Строки, соответствующие предикатам, не включаются в блок; при желании это поведение было бы легко изменить.

Его можно использовать следующим образом:

ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)),
    Pattern.compile("^<start>$").asPredicate(),
    Pattern.compile("^<stop>$").asPredicate(),
    true )
   .collect(new MyProcessOneBucketCollector<>())

Шаблоны указываются как ^word$, чтобы вся строка состояла только из одного слова; без этих якорей строки, содержащие шаблон, могут начинать и заканчивать фрагмент. Природа исходного потока не допускает параллелизма при создании фрагментов, поэтому при цепочке с операцией немедленного сбора параллелизм для всей операции довольно ограничен. Это зависит от MyProcessOneBucketCollector, может ли быть вообще какой-либо параллелизм.

Если ваш окончательный результат не зависит от порядка появления сегментов в исходном файле, настоятельно рекомендуется, чтобы либо ваш сборщик сообщил о себе как UNORDERED или вставьте unordered() в цепочках методов потока перед collect.

person Holger    schedule 20.10.2014
comment
ВАУ, это выглядит великолепно. - person Uwe; 20.10.2014
comment
Для лучшего понимания вопрос о параллелизме. Предполагая, что сегментов UNORDERED, но порядок строк в списке имеет значение, как заставить сборщик выполняться параллельно? ‹br/› ´ChunkSpliterator.toChunks(...).unordered().parallelStream().collect(..)` ??? - person Uwe; 20.10.2014
comment
Порядок строк внутри ведра всегда будет сохраняться, так как ведро List всегда создается однопоточным (как я уже сказал, природа исходного потока не допускает параллелизма), а List не будет внезапно менять порядок своих элементов впоследствии. Таким образом, вызов .unordered() для результата toChunks может только отменить порядок сегментов. Обратите внимание, что вам не нужно вызывать .parallelStream(), передача true методу .toChunks(…) уже создаст параллельный поток. - person Holger; 20.10.2014
comment
Vielen Dank mein Meister. :-) - person Uwe; 21.10.2014