Кажется, существует фундаментальное непонимание того, как должны взаимодействовать limit
и trySplit
. Предположение, что не должно быть больше trySplit
вызовов, чем указанное limit
, совершенно неверно.
Целью trySplit
является разделение исходных данных на две части, в лучшем случае на две половины, так как trySplit
предполагает попытку сбалансированного разделения. Таким образом, если у вас есть исходный набор данных из одного миллиона элементов, успешное разделение дает два исходных набора данных по полмиллиона элементов в каждом. Это совершенно не связано с limit(20)
, который вы могли применить к потоку, за исключением того, что мы заранее знаем, что можем отбросить второй набор данных, если разделитель имеет характеристики SIZED|SUBSIZED
, как запрошенный первый двадцать элементов можно найти только в пределах первых полумиллиона.
Легко подсчитать, что в лучшем случае, т. е. со сбалансированным разбиением, нам нужно уже пятнадцать операций разбиения, каждый раз отбрасывая верхнюю половину, прежде чем мы получим разбиение между первыми двадцатью элементами, которые позволяет нам обрабатывать эти первые двадцать элементов параллельно.
Что можно легко продемонстрировать:
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
int current, fence;
DebugSpliterator() {
this(0, 1_000_000);
}
DebugSpliterator(int start, int end) {
super(end-start, ORDERED|SIZED|SUBSIZED);
current = start;
fence = end;
}
@Override public boolean tryAdvance(IntConsumer action) {
if(current<fence) {
action.accept(current++);
return true;
}
return false;
}
@Override public OfInt trySplit() {
int mid = (current+fence)>>>1;
System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
return mid>current? new DebugSpliterator(current, current=mid): null;
}
}
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
На моей машине он печатает:
trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]
что, конечно, намного больше, чем двадцать разделенных попыток, но вполне разумно, поскольку набор данных должен быть разделен до тех пор, пока у нас не будет поддиапазонов в пределах желаемого целевого диапазона, чтобы иметь возможность обрабатывать его параллельно.
Мы можем применить другое поведение, отбросив метаинформацию, которая приводит к этой стратегии выполнения:
StreamSupport.stream(new DebugSpliterator(), true)
.filter(x -> true)
.limit(20)
.forEach(x -> {});
Поскольку Stream API ничего не знает о поведении предиката, конвейер теряет свою характеристику SIZED
, что приводит к
trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]
который показывает меньше trySplit
вызовов, но не улучшение; глядя на цифры, видно, что теперь диапазоны за пределами результирующего диапазона элементов (если мы используем наши знания о том, что все элементы пройдут фильтр) обрабатываются, что еще хуже, диапазон результирующих элементов полностью покрывается одним разделителем, что приводит к отсутствию параллельного обработка для наших элементов результата вообще, все остальные потоки обрабатывали элементы, которые впоследствии были удалены.
Конечно, мы могли бы легко обеспечить оптимальное разбиение для нашей задачи, изменив
int mid = (current+fence)>>>1;
to
int mid = fence>20? 20: (current+fence)>>>1;
so
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});
приводит к
trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]
но это будет не сплитератор общего назначения, а тот, который плохо работает, если предел не двадцать.
Если мы сможем включить ограничение в разделитель или, в более общем случае, в источник потока, у нас не будет этой проблемы. Таким образом, вместо list.stream().limit(x)
вы можете вызвать list.subList(0, Math.min(x, list.size())).stream()
, вместо random.ints().limit(x)
использовать random.ints(x)
, вместо Stream.generate(generator).limit(x)
вы можете использовать LongStream.range(0, x).mapToObj( index -> generator.get())
или использовать фабричный метод этот ответ.
Для произвольного источника потока/разделителя применение limit
может быть довольно затратным для параллельных потоков, что составляет даже задокументировано. Ну, а иметь побочные эффекты в trySplit
— это вообще плохая идея.
person
Holger
schedule
04.10.2017