Самый эффективный способ получить последний элемент потока

Stream не имеет метода last():

Stream<T> stream;
T last = stream.last(); // No such method

Какой самый элегантный и/или эффективный способ получить последний элемент (или null для пустого потока)?


person Bohemian♦    schedule 18.12.2014    source источник
comment
Если вам нужно найти последний элемент Stream, вы можете пересмотреть свой дизайн, и если вы действительно хотите использовать Stream. Streams не обязательно упорядочены или конечны. Если ваш Stream неупорядочен, бесконечен или и то, и другое, последний элемент не имеет значения. На мой взгляд, смысл Stream состоит в том, чтобы обеспечить уровень абстракции между данными и тем, как вы их обрабатываете. Таким образом, самому Stream не нужно ничего знать об относительном порядке своих элементов. Поиск последнего элемента в Stream — это O(n). Если бы у вас была другая структура данных, она могла бы быть O(1).   -  person Jeffrey    schedule 21.12.2014
comment
@jeff потребность была реальной: ситуация примерно заключалась в добавлении товаров в корзину, каждое добавление возвращало информацию об ошибке (определенные комбинации товаров недействительны), но только информацию об ошибке последнего добавления (когда все товары были добавлены и справедливая оценку тележки можно было сделать) нужна была информация. (Да, используемый нами API неисправен и не может быть исправлен).   -  person Bohemian♦    schedule 21.12.2014
comment
@BrianGoetz: бесконечные потоки также не имеют четко определенного count(), но Stream по-прежнему имеет метод count(). На самом деле этот аргумент применим к любой терминальной операции без короткого замыкания на бесконечных потоках.   -  person Jeffrey Bosboom    schedule 24.12.2014
comment
@BrianGoetz Я думаю, что потоки должны иметь метод last(). 1 апреля может быть проведен опрос о том, как его следует определять для бесконечных потоков. Я бы предложил: он никогда не возвращается и использует хотя бы одно ядро ​​​​процессора на 100%. На параллельных потоках требуется использовать все ядра на 100%.   -  person Vojta    schedule 13.04.2017
comment
Если список содержит объекты с естественным порядком или которые можно упорядочить, вы можете использовать метод max(), как в stream()...max(Comparator...).   -  person Erk    schedule 06.08.2019
comment
@erk конечно, но это менее эффективно, чем любое решение на этой странице, и не имеет прямого отношения к вопросу (который не включает сопоставимые элементы)   -  person Bohemian♦    schedule 06.08.2019


Ответы (7)


Сделайте сокращение, которое просто возвращает текущее значение:

Stream<T> stream;
T last = stream.reduce((a, b) -> b).orElse(null);
person Bohemian♦    schedule 18.12.2014
comment
Вы бы сказали, что это было элегантно, эффективно или и то, и другое? - person Duncan Jones; 18.12.2014
comment
@Duncan Я думаю, что это и то, и другое, но я еще не орудие в Java 8, и эта потребность возникла на днях на работе - младший втолкнул поток в стек, а затем вытащил его, и я подумал, что это выглядит лучше, но там может быть что-то еще проще там. - person Bohemian♦; 18.12.2014
comment
По простоте и элегантности этот ответ выигрывает. И в общем случае он достаточно эффективен; он будет достаточно хорошо распараллелен. Для некоторых источников потока, которые знают свой размер, есть более быстрый способ, но в большинстве случаев дополнительный код для сохранения этих нескольких итераций не стоит. - person Brian Goetz; 23.12.2014
comment
@BrianGoetz, как это будет хорошо распараллеливаться? последнее значение будет непредсказуемым при использовании параллельного потока - person benez; 22.02.2017
comment
@kewlbfy Нет, если в потоке есть определенный порядок встреч, параллельное сокращение будет его соблюдать. Если источник потока разделяется без ошибок, вы получите ответ за время O(lg n). - person Brian Goetz; 13.04.2017
comment
@BrianGoetz: это все еще O(n), даже если разделить на количество ядер ЦП. Поскольку поток не знает, что делает функция редукции, он все равно должен вычислять ее для каждого элемента. - person Holger; 13.11.2017

Это сильно зависит от характера Stream. Имейте в виду, что «простой» не обязательно означает «эффективный». Если вы подозреваете, что поток очень большой, выполняет тяжелые операции или имеет источник, который заранее знает размер, следующее может быть значительно более эффективным, чем простое решение:

static <T> T getLast(Stream<T> stream) {
    Spliterator<T> sp=stream.spliterator();
    if(sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        for(;;) {
            Spliterator<T> part=sp.trySplit();
            if(part==null) break;
            if(sp.getExactSizeIfKnown()==0) {
                sp=part;
                break;
            }
        }
    }
    T value=null;
    for(Iterator<T> it=recursive(sp); it.hasNext(); )
        value=it.next();
    return value;
}

private static <T> Iterator<T> recursive(Spliterator<T> sp) {
    Spliterator<T> prev=sp.trySplit();
    if(prev==null) return Spliterators.iterator(sp);
    Iterator<T> it=recursive(sp);
    if(it!=null && it.hasNext()) return it;
    return recursive(prev);
}

Вы можете проиллюстрировать разницу на следующем примере:

String s=getLast(
    IntStream.range(0, 10_000_000).mapToObj(i-> {
        System.out.println("potential heavy operation on "+i);
        return String.valueOf(i);
    }).parallel()
);
System.out.println(s);

Он будет печатать:

potential heavy operation on 9999999
9999999

Другими словами, он выполнял операцию не над первыми 9999999 элементами, а только над последним.

person Holger    schedule 18.12.2014
comment
В чем смысл блока hasCharacteristics()? Какое значение он добавляет, что еще не охвачено методом recursive()? Последний уже переходит к последней точке разделения. Кроме того, recursive() никогда не может вернуть null, поэтому вы можете удалить проверку it != null. - person Gili; 10.04.2015
comment
Рекурсивная операция может обрабатывать каждый случай, но это всего лишь запасной вариант, поскольку в худшем случае глубина рекурсии соответствует количеству (нефильтрованных!) элементов. Идеальным случаем является поток SUBSIZED, который может гарантировать непустые разделенные половины, поэтому нам никогда не нужно возвращаться к левой стороне. Обратите внимание, что в этом случае recursive фактически не будет рекурсивным, так как trySplit уже доказал, что возвращает null. - person Holger; 10.04.2015
comment
Конечно, код можно было написать по-другому, и так оно и было; Я предполагаю, что null-проверка происходит из более ранней версии, но затем я обнаружил, что для потоков, отличных от SUBSIZED, вам приходится иметь дело с возможными пустыми разделенными частями, т.е. вам нужно выполнить итерацию, чтобы узнать, есть ли у него значения, поэтому я переместил вызов Spliterators.iterator(…) в метод recursive, чтобы иметь возможность резервного копирования на левую сторону, если правая сторона пуста. Цикл по-прежнему является предпочтительной операцией. - person Holger; 10.04.2015
comment
Интересное решение. Обратите внимание, что в соответствии с текущей реализацией Stream API ваш поток должен быть либо параллельным, либо напрямую подключенным к разделителю источника. В противном случае он по какой-то причине откажется от разбиения, даже если разветвитель базового источника разделится. С другой стороны, вы не можете использовать parallel() вслепую, так как это может фактически выполнять некоторые операции (например, сортировку) параллельно, неожиданно потребляя больше ядер ЦП. - person Tagir Valeev; 21.09.2015
comment
@Tagir Valeev: правильно, в примере кода используется .parallel(), но на самом деле это может повлиять на sorted() или distinct(). Не думаю, что должен быть эффект от каких-то других промежуточных операций… - person Holger; 21.09.2015
comment
@Holger, реализация потока иногда довольно интересна, если я заменю ваш пример на IntStream.range(0, 10_000_000).filter(x -> x > 0).mapToObj(i -> {..., то есть: добавлю этот filter, но удалю parallel, полученный Spliterator будет рассматриваться как неразделяемый, хотя не уверен, почему это так - person Eugene; 02.04.2019
comment
@Eugene это то, о чем Тагир Валеев уже упомянул в своем комментарии; по-видимому, реализация Stream имеет две разные реализации Spliterator, и та, что возвращается из последовательного потока, не поддерживает разделение, хотя я не могу себе представить, что это дает значительное преимущество для Stream, состоящих только из операций без сохранения состояния. - person Holger; 03.04.2019

В гуаве есть Streams.findLast:

Stream<T> stream;
T last = Streams.findLast(stream);
person Robert Važan    schedule 02.03.2018
comment
И он работает намного лучше, чем reduce((a, b) -> b), потому что внутри использует Spliterator.trySplit. - person ZhekaKozlov; 15.06.2019

Это просто рефакторинг ответа Хольгера, потому что код, хотя и фантастический, немного труден для чтения/понимания, особенно для людей, которые не были программистами на C до Java. Надеюсь, что мой рефакторинговый пример класса будет немного проще для тех, кто не знаком с сплитераторами, что они делают или как они работают.

public class LastElementFinderExample {
    public static void main(String[] args){
        String s = getLast(
            LongStream.range(0, 10_000_000_000L).mapToObj(i-> {
                System.out.println("potential heavy operation on "+i);
                return String.valueOf(i);
            }).parallel()
        );
        System.out.println(s);
    }

    public static <T> T getLast(Stream<T> stream){
        Spliterator<T> sp = stream.spliterator();
        if(isSized(sp)) {
            sp = getLastSplit(sp);
        }
        return getIteratorLastValue(getLastIterator(sp));
    }

    private static boolean isSized(Spliterator<?> sp){
        return sp.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED);
    }

    private static <T> Spliterator<T> getLastSplit(Spliterator<T> sp){
        return splitUntil(sp, s->s.getExactSizeIfKnown() == 0);
    }

    private static <T> Iterator<T> getLastIterator(Spliterator<T> sp) {
        return Spliterators.iterator(splitUntil(sp, null));
    }

    private static <T> T getIteratorLastValue(Iterator<T> it){
        T result = null;
        while (it.hasNext()){
            result = it.next();
        }
        return result;
    }

    private static <T> Spliterator<T> splitUntil(Spliterator<T> sp, Predicate<Spliterator<T>> condition){
        Spliterator<T> result = sp;
        for (Spliterator<T> part = sp.trySplit(); part != null; part = result.trySplit()){
            if (condition == null || condition.test(result)){
                result = part;
            }
        }
        return result;      
    }   
}
person Steve K    schedule 23.12.2014

Вот еще одно решение (не такое эффективное):

List<String> list = Arrays.asList("abc","ab","cc");
long count = list.stream().count();
list.stream().skip(count-1).findFirst().ifPresent(System.out::println);
person panagdu    schedule 18.12.2014
comment
Интересно... Вы тестировали это? Потому что нет метода substream, и даже если бы он был, это не сработало бы, потому что count — терминальная операция. Итак, что за история стоит за этим? - person Lii; 18.12.2014
comment
Странно, я не знаю, какой у меня jdk, но у него есть подпоток. Я просмотрел официальные документы javadoc(. oracle.com/javase/8/docs/api/java/util/stream/Stream.html), и вы правы, его здесь нет. - person panagdu; 18.12.2014
comment
Конечно, вам придется сначала проверить, не count==0 ли, поскольку Stream.skip не нравится -1 в качестве входных данных. Кроме того, в вопросе не говорилось, что вы можете получить Stream дважды. Также не сказано, что получение Stream дважды гарантирует получение одинакового количества элементов. - person Holger; 18.12.2014

Параллельные неразмерные потоки с методами «пропустить» сложны, и реализация @Holger дает неправильный ответ. Также реализация @Holger немного медленнее, поскольку использует итераторы.

Оптимизация ответа @Holger:

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();
    Spliterator<? extends T> lastSpliterator = spliterator;

    // Note that this method does not work very well with:
    // unsized parallel streams when used with skip methods.
    // on that cases it will answer Optional.empty.

    // Find the last spliterator with estimate size
    // Meaningfull only on unsized parallel streams
    if(spliterator.estimateSize() == Long.MAX_VALUE) {
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            lastSpliterator = prev;
        }
    }

    // Find the last spliterator on sized streams
    // Meaningfull only on parallel streams (note that unsized was transformed in sized)
    for (Spliterator<? extends T> prev = lastSpliterator.trySplit(); prev != null; prev = lastSpliterator.trySplit()) {
        if (lastSpliterator.estimateSize() == 0) {
            lastSpliterator = prev;
            break;
        }
    }

    // Find the last element of the last spliterator
    // Parallel streams only performs operation on one element
    AtomicReference<T> last = new AtomicReference<>();
    lastSpliterator.forEachRemaining(last::set);

    return Optional.ofNullable(last.get());
}

Модульное тестирование с использованием junit 5:

@Test
@DisplayName("last sequential sized")
void last_sequential_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last sequential unsized")
void last_sequential_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}

@Test
@DisplayName("last parallel sized")
void last_parallel_sized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("getLast parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(1);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    // Unfortunately unsized parallel streams does not work very well with skip
    //assertThat(Streams.last(stream)).hasValue(expected);
    //assertThat(count).hasValue(1);

    // @Holger implementation gives wrong answer!!
    //assertThat(Streams.getLast(stream)).hasValue(9_950_000L); //!!!
    //assertThat(count).hasValue(1);

    // This is also not a very good answer better
    assertThat(Streams.last(stream)).isEmpty();
    assertThat(count).hasValue(0);
}

Единственное решение для поддержки обоих сценариев — избежать обнаружения последнего разделителя в параллельных потоках без размера. Следствием этого является то, что решение будет выполнять операции над всеми элементами, но всегда будет давать правильный ответ.

Обратите внимание, что в последовательных потоках он в любом случае будет выполнять операции над всеми элементами.

public static <T> Optional<T> last(Stream<? extends T> stream) {
    Objects.requireNonNull(stream, "stream");

    Spliterator<? extends T> spliterator = stream.spliterator();

    // Find the last spliterator with estimate size (sized parallel streams)
    if(spliterator.hasCharacteristics(Spliterator.SIZED|Spliterator.SUBSIZED)) {
        // Find the last spliterator on sized streams (parallel streams)
        for (Spliterator<? extends T> prev = spliterator.trySplit(); prev != null; prev = spliterator.trySplit()) {
            if (spliterator.getExactSizeIfKnown() == 0) {
                spliterator = prev;
                break;
            }
        }
    }

    // Find the last element of the spliterator
    //AtomicReference<T> last = new AtomicReference<>();
    //spliterator.forEachRemaining(last::set);

    //return Optional.ofNullable(last.get());

    // A better one that supports native parallel streams
    return (Optional<T>) StreamSupport.stream(spliterator, stream.isParallel())
            .reduce((a, b) -> b);
}

Что касается модульного тестирования для этой реализации, первые три теста точно такие же (последовательные и параллельные). Тесты для неразмерной параллели находятся здесь:

@Test
@DisplayName("last parallel unsized")
void last_parallel_unsized() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(10_000_000L);
}

@Test
@DisplayName("last parallel unsized with skip")
void last_parallel_unsized_with_skip() throws Exception {
    long expected = 10_000_000L;
    AtomicLong count = new AtomicLong();
    Stream<Long> stream = LongStream.rangeClosed(1, expected).boxed().parallel();
    stream = StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel());
    stream = stream.skip(50_000).peek(num -> count.getAndIncrement());

    assertThat(Streams.last(stream)).hasValue(expected);
    assertThat(count).hasValue(9_950_000L);
}
person Tet    schedule 04.10.2017
comment
Обратите внимание, что модульные тесты используют библиотеку assertj для большей беглости. - person Tet; 04.10.2017
comment
Проблема в том, что вы делаете StreamSupport.stream(((Iterable<Long>) stream::iterator).spliterator(), stream.isParallel()), проходите Iterable обходным путем, который вообще не имеет характеристик, другими словами создаете неупорядоченный поток. Таким образом, результат не имеет ничего общего с parallel или использованием skip, а просто с тем фактом, что «последний» не имеет значения для неупорядоченного потока, поэтому допустимым результатом является любой элемент. - person Holger; 09.10.2017

Нам понадобился last Stream в производстве — я до сих пор не уверен, что мы действительно это сделали, но разные члены моей команды сказали, что мы это сделали по разным «причинам». В итоге я написал что-то вроде этого:

 private static class Holder<T> implements Consumer<T> {

    T t = null;
    // needed to null elements that could be valid
    boolean set = false;

    @Override
    public void accept(T t) {
        this.t = t;
        set = true;
    }
}

/**
 * when a Stream is SUBSIZED, it means that all children (direct or not) are also SIZED and SUBSIZED;
 * meaning we know their size "always" no matter how many splits are there from the initial one.
 * <p>
 * when a Stream is SIZED, it means that we know it's current size, but nothing about it's "children",
 * a Set for example.
 */
private static <T> Optional<Optional<T>> last(Stream<T> stream) {

    Spliterator<T> suffix = stream.spliterator();
    // nothing left to do here
    if (suffix.getExactSizeIfKnown() == 0) {
        return Optional.empty();
    }

    return Optional.of(Optional.ofNullable(compute(suffix, new Holder())));
}


private static <T> T compute(Spliterator<T> sp, Holder holder) {

    Spliterator<T> s;
    while (true) {
        Spliterator<T> prefix = sp.trySplit();
        // we can't split any further
        // BUT don't look at: prefix.getExactSizeIfKnown() == 0 because this
        // does not mean that suffix can't be split even more further down
        if (prefix == null) {
            s = sp;
            break;
        }

        // if prefix is known to have no elements, just drop it and continue with suffix
        if (prefix.getExactSizeIfKnown() == 0) {
            continue;
        }

        // if suffix has no elements, try to split prefix further
        if (sp.getExactSizeIfKnown() == 0) {
            sp = prefix;
        }

        // after a split, a stream that is not SUBSIZED can give birth to a spliterator that is
        if (sp.hasCharacteristics(Spliterator.SUBSIZED)) {
            return compute(sp, holder);
        } else {
            // if we don't know the known size of suffix or prefix, just try walk them individually
            // starting from suffix and see if we find our "last" there
            T suffixResult = compute(sp, holder);
            if (!holder.set) {
                return compute(prefix, holder);
            }
            return suffixResult;
        }


    }

    s.forEachRemaining(holder::accept);
    // we control this, so that Holder::t is only T
    return (T) holder.t;

}

И некоторые варианты его использования:

    Stream<Integer> st = Stream.concat(Stream.of(1, 2), Stream.empty());
    System.out.println(2 == last(st).get().get());

    st = Stream.concat(Stream.empty(), Stream.of(1, 2));
    System.out.println(2 == last(st).get().get());

    st = Stream.concat(Stream.iterate(0, i -> i + 1), Stream.of(1, 2, 3));
    System.out.println(3 == last(st).get().get());

    st = Stream.concat(Stream.iterate(0, i -> i + 1).limit(0), Stream.iterate(5, i -> i + 1).limit(3));
    System.out.println(7 == last(st).get().get());

    st = Stream.concat(Stream.iterate(5, i -> i + 1).limit(3), Stream.iterate(0, i -> i + 1).limit(0));
    System.out.println(7 == last(st).get().get());

    String s = last(
        IntStream.range(0, 10_000_000).mapToObj(i -> {
            System.out.println("potential heavy operation on " + i);
            return String.valueOf(i);
        }).parallel()
    ).get().get();

    System.out.println(s.equalsIgnoreCase("9999999"));

    st = Stream.empty();
    System.out.println(last(st).isEmpty());

    st = Stream.of(1, 2, 3, 4, null);
    System.out.println(last(st).get().isEmpty());

    st = Stream.of((Integer) null);
    System.out.println(last(st).isPresent());

    IntStream is = IntStream.range(0, 4).filter(i -> i != 3);
    System.out.println(last(is.boxed()));

Во-первых, это возвращаемый тип Optional<Optional<T>> — он выглядит странно, я согласен. Если первый Optional пуст, это означает, что в потоке нет элементов; если второй необязательный элемент пуст, это означает, что элемент, который был последним, на самом деле был null, то есть: Stream.of(1, 2, 3, null) (в отличие от Streams::findLast guava, который в таком случае генерирует исключение).

Признаюсь, меня вдохновил в основном ответ Хольгера на аналогичный мой вопрос и вопрос гуавы Streams::findLast.

person Eugene    schedule 02.08.2019