Параллельный поток вызывает Spliterator больше раз, чем его предел

Недавно я обнаружил ошибку, в которой

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .limit(20)

вызывал Spliterator.ofInt.tryAdvance более 20 раз. Когда я изменил его на

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .sequential()
    .limit(20)

проблема ушла. Почему это происходит? Есть ли способ добиться строгого ограничения на параллельный поток, когда tryAdvance имеет побочные эффекты, кроме как встроить его в Spliterator? (Это для тестирования некоторых методов, которые возвращают неограниченные потоки, но когда тесты должны достичь окончательного конца без усложнения конструкции «цикла для X миллисекунд».)


person Pr0methean    schedule 03.10.2017    source источник
comment
Опубликуйте минимально воспроизводимый пример.   -  person shmosel    schedule 03.10.2017


Ответы (3)


Кажется, существует фундаментальное непонимание того, как должны взаимодействовать limit и trySplit. Предположение, что не должно быть больше trySplit вызовов, чем указанное limit, совершенно неверно.

Целью trySplit является разделение исходных данных на две части, в лучшем случае на две половины, так как trySplit предполагает попытку сбалансированного разделения. Таким образом, если у вас есть исходный набор данных из одного миллиона элементов, успешное разделение дает два исходных набора данных по полмиллиона элементов в каждом. Это совершенно не связано с limit(20), который вы могли применить к потоку, за исключением того, что мы заранее знаем, что можем отбросить второй набор данных, если разделитель имеет характеристики SIZED|SUBSIZED, как запрошенный первый двадцать элементов можно найти только в пределах первых полумиллиона.

Легко подсчитать, что в лучшем случае, т. е. со сбалансированным разбиением, нам нужно уже пятнадцать операций разбиения, каждый раз отбрасывая верхнюю половину, прежде чем мы получим разбиение между первыми двадцатью элементами, которые позволяет нам обрабатывать эти первые двадцать элементов параллельно.

Что можно легко продемонстрировать:

class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
    int current, fence;
    DebugSpliterator() {
        this(0, 1_000_000);
    }
    DebugSpliterator(int start, int end) {
        super(end-start, ORDERED|SIZED|SUBSIZED);
        current = start;
        fence = end;
    }
    @Override public boolean tryAdvance(IntConsumer action) {
        if(current<fence) {
            action.accept(current++);
            return true;
        }
        return false;
    }
    @Override public OfInt trySplit() {
        int mid = (current+fence)>>>1;
        System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
        return mid>current? new DebugSpliterator(current, current=mid): null;
    }
}
StreamSupport.stream(new DebugSpliterator(), true)
    .limit(20)
    .forEach(x -> {});

На моей машине он печатает:

trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]

что, конечно, намного больше, чем двадцать разделенных попыток, но вполне разумно, поскольку набор данных должен быть разделен до тех пор, пока у нас не будет поддиапазонов в пределах желаемого целевого диапазона, чтобы иметь возможность обрабатывать его параллельно.

Мы можем применить другое поведение, отбросив метаинформацию, которая приводит к этой стратегии выполнения:

StreamSupport.stream(new DebugSpliterator(), true)
    .filter(x -> true)
    .limit(20)
    .forEach(x -> {});

Поскольку Stream API ничего не знает о поведении предиката, конвейер теряет свою характеристику SIZED, что приводит к

trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]

который показывает меньше trySplit вызовов, но не улучшение; глядя на цифры, видно, что теперь диапазоны за пределами результирующего диапазона элементов (если мы используем наши знания о том, что все элементы пройдут фильтр) обрабатываются, что еще хуже, диапазон результирующих элементов полностью покрывается одним разделителем, что приводит к отсутствию параллельного обработка для наших элементов результата вообще, все остальные потоки обрабатывали элементы, которые впоследствии были удалены.

Конечно, мы могли бы легко обеспечить оптимальное разбиение для нашей задачи, изменив

int mid = (current+fence)>>>1;

to

int mid = fence>20? 20: (current+fence)>>>1;

so

StreamSupport.stream(new DebugSpliterator(), true)
    .limit(20)
    .forEach(x -> {});

приводит к

trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]

но это будет не сплитератор общего назначения, а тот, который плохо работает, если предел не двадцать.

Если мы сможем включить ограничение в разделитель или, в более общем случае, в источник потока, у нас не будет этой проблемы. Таким образом, вместо list.stream().limit(x) вы можете вызвать list.subList(0, Math.min(x, list.size())).stream(), вместо random.ints().limit(x) использовать random.ints(x), вместо Stream.generate(generator).limit(x) вы можете использовать LongStream.range(0, x).mapToObj( index -> generator.get()) или использовать фабричный метод этот ответ.

Для произвольного источника потока/разделителя применение limit может быть довольно затратным для параллельных потоков, что составляет даже задокументировано. Ну, а иметь побочные эффекты в trySplit — это вообще плохая идея.

person Holger    schedule 04.10.2017
comment
очевидно, это не сплитератор общего назначения, но, черт возьми, я нашел его умным: int mid = fence>20? 20: (current+fence)>>>1; - person Eugene; 04.10.2017

Я никоим образом не думаю, что это ошибка, но все же очень интересная идея, что tryAdvance может иметь побочные эффекты.

Это было бы вполне возможно, насколько я понимаю, для случая, когда ваш trySplit разбивается не на отдельные партии элементов.

Например, у вас есть массив и вы хотите разбить его (через trySplit) на части подмассивов не менее 4 элементов в каждой. В таком случае, когда вы больше не можете разделить (например, вы достигли минимум 4 элементов в текущем Spliterator), когда начнется обработка - будет вызван forEachRemaning; в свою очередь, по умолчанию он будет вызывать tryAdvance для каждого элемента в текущем Spliterator, как показано в реализации по умолчанию:

default void forEachRemaining(Consumer<? super T> action) {
    do { } while (tryAdvance(action));
}

Очевидно, что поскольку вы выполняете работу параллельно - как только поток начал свою работу (читай executing it's forEachRemaning), его больше нельзя остановить - так много других элементов попадут в tryAdvance.

Таким образом, я действительно не думаю, что есть способ сделать это, кроме как интегрировать это в сам Spliterator; Я думаю, что это должно работать:

  static class LimitingSpliterator<T> implements Spliterator<T> {

    private int limit;

    private final Supplier<T> generator;

    private LimitingSpliterator(Supplier<T> generator, int limit) {
        this.limit = limit;
        this.generator = generator;
    }

    static <T> LimitingSpliterator<T> of(Supplier<T> supplier, int limit) {
        return new LimitingSpliterator<>(supplier, limit);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer);
        if (limit > 0) {
            --limit;
            generator.get();
            consumer.accept(generator.get());
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(final Consumer<? super T> consumer) {
        while (limit > 0) {
            consumer.accept(generator.get());
            --limit;
        }
    }

    @Override
    public LimitingSpliterator<T> trySplit() {
        int half = limit >> 2;
        limit = limit - half;
        return new LimitingSpliterator<>(generator, half);
    }

    @Override
    public long estimateSize() {
        return limit << 2;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}
person Eugene    schedule 03.10.2017
comment
Или просто LongStream.range(0, limit).unordered().mapToObj(x -> generator.get())… [.parallel()…] - person Holger; 04.10.2017

Для моего варианта использования решение состояло в том, чтобы использовать: LongStream.range(0, streamSize).unordered().parallel().mapToInt(ignored -> nextInt()) NB: это было для потока случайных чисел из PRNG, который мог постоянно повторно заполняться.

person Pr0methean    schedule 13.08.2018