Реализация непараллельного Spliterator для неизвестного размера?

Я немного смущен всеми моими исследованиями. У меня есть собственный интерфейс под названием TabularResultSet (который я упростил для примера), который проходит через любой набор данных, который по своей природе является табличным. У него есть метод next(), такой как итератор, и он может проходить через QueryResultSet, таблицу с вкладками из буфера обмена, CSV и т. д.

Однако я пытаюсь создать Spliterator, который обертывает мой TabularResultSet и легко превращает его в поток. Я не могу представить себе безопасный способ распараллеливания, потому что TabularResultSet может проходить через QueryResultSet, а одновременный вызов next() может нанести ущерб. Единственный способ, которым я представляю, что распараллеливание может быть выполнено безопасно, - это вызов next() одним рабочим потоком, и он передает данные параллельному потоку для работы с ним.

Поэтому я думаю, что распараллеливание — не самый простой вариант. Как мне просто заставить эту вещь работать без распараллеливания? Вот моя работа на данный момент...

public final class SpliteratorTest {

    public static void main(String[] args) {
       TabularResultSet rs = null; /* instantiate an implementation; */

       Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
    }

    public static interface TabularResultSet {
        public boolean next();

        public List<Object> getData();
    }

    private static final class TabularSpliterator implements Spliterator<TabularResultSet> {

        private final TabularResultSet rs;

        public TabularSpliterator(TabularResultSet rs) {
            this.rs = rs;
        }
        @Override
        public boolean tryAdvance(Consumer<? super TabularResultSet> action) {
            action.accept(rs);
            return rs.next();
        }

        @Override
        public Spliterator<TabularResultSet> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}

person tmn    schedule 09.03.2015    source источник


Ответы (2)


Вероятно, проще всего расширить Spliterators.AbstractSpliterator. . Если вы сделаете это, вам нужно будет реализовать только tryAdvance. Это можно превратить в параллельный поток; параллелизм возникает из-за того, что реализация потоков вызывает tryAdvance несколько раз, группирует полученные данные и обрабатывает их в разных потоках.

Если TabularResultSet чем-то похож на JDBC ResultSet, я не думаю, что вам нужны Spliterator<TabularResultSet> или Stream<TabularResultSet>. Вместо этого выглядит так, будто TabularResultSet представляет собой весь набор табличных данных, поэтому вы, вероятно, захотите, чтобы каждый разделитель или элемент потока представлял одну строку в этой таблице — List<Object>, возвращаемый getData()? Если это так, вам нужно что-то вроде следующего.

class TabularSpliterator extends Spliterators.AbstractSpliterator<List<Object>> {
    private final TabularResultSet rs;

    public TabularSpliterator(TabularResultSet rs) {
        super(...);
        this.rs = rs;
    }

    @Override public boolean tryAdvance(Consumer<? super List<Object>> action) {
        if (rs.next()) {
            action.accept(rs.getData());
            return true;
        } else {
            return false;
        }
    }
}

Затем вы можете превратить экземпляр этого разделителя в поток, вызвав StreamSupport.stream().

Примечание: как правило, экземпляр Spliterator не вызывается из нескольких потоков и даже не должен быть потокобезопасным. См. документацию по классу Spliterator в параграфе начало Несмотря... подробнее.

person Stuart Marks    schedule 09.03.2015
comment
@Томас Н. Правильно, самому Spliterator не нужно беспокоиться о том, чтобы его вызывали одновременно. (Я редактировал некоторые ссылки на документы.) Метод tryAdvance может вызываться несколько раз, а результаты передаваться в другой поток, поэтому результаты, передаваемые в action.accept(), должны быть независимы друг от друга. Если SingletonTabularSet делает это, все должно быть в порядке. - person Stuart Marks; 09.03.2015
comment
Извините, я удалил свои комментарии из-за моего последующего исследования, указывающего на то, что Стюарт только что подтвердил, так же, как он ответил на мои комментарии. Другим читателям я подтверждаю, что пока я безопасно извлекаю данные из TabularResultSet, я могу распараллелить поток. Вместо вызова toList(), который был упрощенным заполнителем для этого примера, я на самом деле вызову toSingletonTabularSet(), который возвращает неизменяемый синглтон для одной записи TabularResultSet. Если мне когда-нибудь понадобится распараллелить, эти SingletonTabularSets можно безопасно передать потокам. - person tmn; 09.03.2015
comment
Хорошо, чем больше я размышлял об этом, тем больше это имело смысл. В action.accept() начинается распараллеливание (если оно есть). Так что, пока состояние итерируемого объекта ограничено или неизменно, переход к action.accept() безопасен. - person tmn; 10.03.2015

Ты в основном там. Все, что вам нужно сделать сейчас, это преобразовать ваш Spliterator в Stream. Это можно сделать с помощью метод StreamSupport.stream(Spliterator, boolean). Логический параметр — это флаг того, хотите ли вы выполнять параллельную потоковую передачу или нет (вы бы хотели false, чтобы не было параллельной).

Если ваш TabularResultSet реализовал Iterator, вы можете использовать метод Spliterators.spliteratorUnknownSize() для преобразования Iterator в Spliterator, который в основном делает то же, что и код, который у вас есть выше.

Не уверен, стоит ли добавлять характеристики, но вы можете рассмотреть Spliterator.IMMUTABLE| Spliterator.ORDERED | Spliterator.NONNULL

удачи

person dkatzel    schedule 09.03.2015
comment
Было интересно, почему сборщик получал нулевые значения в конце потока. Только что понял, что мне нужно что-то сделать с параметром action в методе tryNext(). Я обновил выше. - person tmn; 09.03.2015