Вступление

Коллекции - это базовые и часто используемые структуры данных. Программисты с самого начала своей карьеры учатся использовать их для получения, обработки и возврата данных. Становясь более продвинутыми в программировании на Java, они находят stream() метод преобразования коллекции в поток и узнают, как обрабатывать данные с помощью некоторых полезных методов потока, таких как map, flatMap или reduce. Они также могли заметить, что другие API в Java тоже возвращают поток, например. String.lines(), Matcher.results(), Files.find(), Random.ints().

Если у вас есть опыт работы с потоками, но вы еще не создали их, эта статья для вас. Я собираюсь показать вам несколько сценариев, в которых потоки могут быть очень удобными, и примеры того, как их использовать. Кроме того, я кратко упомяну обработку ошибок и управление ресурсами.

Статья основана на стандартной библиотеке Java java.util.stream. Это не относится ни к реактивным потокам, ни к другой реализации потоков, например, Вавр. Кроме того, я не собираюсь описывать сложные детали потоков, такие как параллельное выполнение.

Во-первых, давайте кратко обсудим отличительные особенности потоков по сравнению с коллекциями. Хотя есть некоторые сходства, различия существенны, и вы не должны рассматривать потоки как просто еще один тип коллекции в библиотеке.

Согласно документации java.util.stream наиболее важными функциями являются:

  • Без хранилища и Возможно неограниченное - коллекции представляют собой готовые структуры данных, а поток представляет собой возможность создавать данные , который обычно даже не существует в момент создания потока. Поскольку данные в потоках не хранятся, мы можем создавать практически неограниченные потоки или перефразировать их более практично, мы можем позволить потребителю решать, сколько элементов читать из потока, сохраняя его потенциально неопределенным с точки зрения производителя (например, new Random().ints()).
  • Поиск лени - многие операции (например, фильтрация, сопоставление) приостанавливаются во время определения потока и выполняются только тогда, когда потребитель решает использовать данные из потока.
  • Функциональный по своей природе - поскольку у вас уже есть некоторый опыт работы с потоками, вы могли заметить, что при обработке данных в потоках вы создаете новый поток для каждого шага, например фильтр или карту, вместо изменения исходных данных.
  • Расходный - вы можете прочитать поток только один раз, затем он становится «потребляемым», в отличие от коллекций, которые можно читать много раз.

Давайте теперь посмотрим, какие проблемы мы можем решить с помощью потоков.

Обработка большого объема данных

Предположим, нам нужно реплицировать данные из внешней службы в нашу базу данных. Объем данных для репликации может быть сколь угодно большим. Мы не можем получить все данные, сохранить их в коллекции, а затем сохранить в базе данных из-за потенциального риска нехватки памяти кучи. Мы должны обрабатывать данные партиями и спроектировать интерфейс между клиентом внешнего сервиса и хранилищем базы данных. Поскольку поток не хранит дату, его можно использовать для безопасной обработки необходимого количества данных.

Пример:

В этом примере (и во всех последующих) мы собираемся использовать статические методы интерфейса java.util.stream.Stream для создания потока. Самый мощный и гибкий способ создания потока в Java - реализовать интерфейс Spliterator, а затем обернуть его в поток с помощью класса StreamSupport. Хотя, как мы видим, статических фабричных методов в Stream интерфейсе во многих случаях достаточно.

Предположим, что есть простой API для извлечения данных из внешней службы, которая поддерживает разбиение на страницы (например, служба отдыха, база данных). API выбирает не более limit элементов, начиная с offset. Итеративно используя API, мы можем получить столько данных, сколько потребуется.

Теперь мы можем использовать API для предоставления потока данных и изоляции потребителя API от API разбивки на страницы:

Где Cursor - это простой держатель текущего смещения.

Мы используем метод Stream.generate() для построения бесконечного потока (7), где каждый элемент создается предоставленным поставщиком. На этом этапе элементы потока - это страницы, извлеченные из REST API, представленного List<T>. Экземпляр класса Cursor создается для каждого потока, чтобы отслеживать прогресс выбранных элементов. Stream.takeWhile() метод (8) используется для обнаружения последней страницы и, наконец, чтобы вернуть поток T вместо List<T>, мы используем flatMap для сглаживания потока (9). Хотя в некоторых сценариях может быть полезно сохранить пакет, например чтобы сохранить всю страницу за одну транзакцию.

Теперь мы можем использовать Service.stream(size, batchSize) для извлечения произвольного длинного потока, не зная API разбиения на страницы (мы решили предоставить параметр batchSize, но это дизайнерское решение). Потребление памяти в любой момент времени ограничено размером пакета. Потребитель может обрабатывать данные одно за другим, сохраняя их в базе данных, или повторно обрабатывать их (с потенциально разными размерами пакетов).

Быстрый доступ к (неполным) данным

Предположим, у нас есть трудоемкая операция, которая должна выполняться с каждым элементом данных, а вычисление занимает время t. Для n элементов потребитель должен дождаться t * n, прежде чем получить результат вычисления. Это может быть проблема, например, если пользователь ждет таблицы с результатами вычислений. Желательно, чтобы первые результаты отображались мгновенно, поскольку они вычисляются, вместо того, чтобы ждать вычисления всех результатов и сразу заполнять таблицу.

Пример:

Потребитель:

Выход:

Processing of: a
aa
Processing of: b
…

Как мы видим, результат обработки первого элемента - «aa» доступен пользователю до начала обработки следующего элемента, но все же за вычисление отвечает производитель потока. Другими словами, потребитель решает, когда и если вычисление должно быть выполнено, но производитель по-прежнему несет ответственность за как выполнить вычисление.

Вы можете подумать, что это легко, и вам не нужен поток. Конечно, вы правы, давайте посмотрим:

И потребитель:

Мы достигли того же результата, но ценой инкапсуляции - expensiveStringDoubler должно стать достоянием общественности и, что еще хуже, теперь ответственность за его вызов лежит на потребителе.

Но подождите, мы можем сделать лучше:

И потребитель:

Снова тот же эффект, но на самом деле мы заново изобрели колесо, наша реализация имитирует предка потока - Iterator, и мы потеряли преимущество API потока.

Избегайте преждевременных вычислений

Предположим снова, что у нас есть трудоемкая операция, выполняемая с каждым элементом потока. Бывают ситуации, когда потребитель API не может заранее сказать, сколько данных требуется. Например:

  • пользователь отменил загрузку данных
  • произошла ошибка при обработке данных и нет необходимости обрабатывать остальные данные
  • потребитель читает данные до тех пор, пока не будет выполнено условие, например первое положительное значение

Благодаря ленивости потоков в таких ситуациях можно избежать некоторых вычислений.

Пример

Потребитель:

В этом примере потребитель считывает данные, пока значение не станет больше 0,4. Производитель не осведомлен о такой логике потребителя, но он вычисляет столько элементов, сколько необходимо. Логика (например, условие) может быть изменена независимо на стороне потребителя.

API простой в использовании

Есть еще одна причина использовать потоки вместо пользовательского дизайна API. Потоки являются частью стандартной библиотеки и хорошо известны многим разработчикам. Использование потоков в нашем API упрощает использование API другими разработчиками.

Дополнительные соображения

Обработка ошибок

Традиционная обработка ошибок не работает с Streams. Поскольку фактическая обработка откладывается до тех пор, пока не потребуется, исключение не может быть создано при построении потока. В принципе, у нас есть два варианта:

  • выбросить RuntimeException - исключение будет сгенерировано методом завершения (например, forEach)
  • обернуть элемент в объект, представляющий текущее состояние обрабатываемого элемента, например. специализированный класс Try из библиотеки Vavr (подробности в блоге)

Управление ресурсами

Иногда нам нужно использовать ресурс для предоставления данных потока (например, сеанс во внешней службе), и мы хотим освободить его, когда обработка потока завершится. К счастью, поток реализует интерфейс Autoclosable, и мы можем использовать поток в операторах try-with-resources, что значительно упрощает управление ресурсами. Все, что нам нужно сделать, это зарегистрировать ловушку в потоке с помощью метода onClose. Хук будет автоматически вызван при закрытии потока.

Пример

Потребитель:

Выход:

0.2264004802916616
0.32777949557515484
Releasing resources…
Exception in thread “main” java.lang.RuntimeException: Data processing exception

В этом примере, когда возникает исключение обработки данных, поток автоматически закрывается оператором try-with-resources и вызывается зарегистрированный обработчик. В выходных данных примера мы можем увидеть Releasing resources… сообщение, напечатанное обработчиком.

Заворачивать

  1. Потоки - это не коллекции.
  2. Потоки могут помочь нам решить такие проблемы, как:
    * Обработка большого объема данных
    * Быстрый доступ к (неполным) данным
    * Избегайте преждевременных вычислений
  3. Построить ручей не так уж и сложно.
  4. Мы должны позаботиться об обработке ошибок.
  5. Поддерживается управление ресурсами.