Java 8 parallelStream() с сортировкой()

JDK 8 EA уже вышел, и я просто пытаюсь привыкнуть к лямбде и новому Stream API. Я пытался отсортировать список с параллельным потоком, но результат всегда неправильный:

import java.util.ArrayList;
import java.util.List;

public class Test
{
    public static void main(String[] args)
    {
        List<String> list = new ArrayList<>();
        list.add("C");
        list.add("H");
        list.add("A");
        list.add("A");
        list.add("B");
        list.add("F");
        list.add("");

        list.parallelStream() // in parallel, not just concurrently!
            .filter(s -> !s.isEmpty()) // remove empty strings
            .distinct() // remove duplicates
            .sorted() // sort them
            .forEach(s -> System.out.println(s)); // print each item
    }
}

ВЫВОД:

C
F
B
H
A

Обратите внимание, что каждый раз вывод отличается. Мои вопросы, это ошибка? или нельзя сортировать список параллельно? если да, то почему в JavaDoc об этом не говорится? Последний вопрос: есть ли другая операция, вывод которой будет отличаться в зависимости от типа потока?


person Eng.Fouad    schedule 22.10.2013    source источник
comment
Вероятно, было бы лучше удалить дубликаты после сортировки.   -  person Ingo    schedule 22.01.2014


Ответы (2)


Вам нужно использовать forEachOrdered, а не forEach.

Согласно документу forEach:

Для конвейеров с параллельными потоками эта операция не гарантирует соблюдения порядка встреч потока, так как это приведет к потере преимущества параллелизма. Для любого заданного элемента действие может быть выполнено в любое время и в любом потоке, выбранном библиотекой. Если действие получает доступ к общему состоянию, оно отвечает за обеспечение необходимой синхронизации.

person Louis Wasserman    schedule 22.10.2013
comment
Я предполагаю, что внутри он создает отсортированный список, каждый поток добавляет в этот список, а затем переходит к следующему шагу в потоке (forEach), поэтому он выполняется не по порядку, FWIW. - person rogerdpack; 23.09.2017

Кроме того, вы можете больше узнать о параллелизме и forEachOrdered на очень хорошем примере из здесь. Таким образом, использование forEachOrdered в параллельном потоке может привести к потере преимуществ параллелизма.

Вот пример с того же ресурса:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
    new ArrayList<>(Arrays.asList(intArray));

System.out.println("listOfIntegers:");
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed(); 
Collections.sort(listOfIntegers, reversed);  
listOfIntegers
    .stream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Parallel stream");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("Another parallel stream:");
listOfIntegers
    .parallelStream()
    .forEach(e -> System.out.print(e + " "));
System.out.println("");

System.out.println("With forEachOrdered:");
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");

И выход

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

Пятый конвейер использует метод forEachOrdered, который обрабатывает элементы потока в порядке, указанном его источником, независимо от того, выполняли ли вы поток последовательно или параллельно. Обратите внимание, что вы можете потерять преимущества параллелизма, если будете использовать такие операции, как forEachOrdered, с параллельными потоками.

.

person cemal    schedule 25.02.2015
comment
Это немного тонко. Пожалуйста, разверните свой ответ, отредактировав его. - person Artjom B.; 25.02.2015