Операторы преобразования также играют важную роль при обработке данных в Apache Flink и Apache Sprak. Для большинства источников данных они необходимы нам для получения желаемых целевых наборов данных. Flink поддерживает множество встроенных операторов преобразования, таких как map, flatMap, filter и т. Д. Для пользователей. Этот пост направлен на демонстрацию использования встроенных операторов преобразования в Apache Flink. Краткое введение в эти операторы можно найти в Руководстве по программированию DataSet API. Там также есть соответствующие документы на китайском языке, предоставленные Apache Flink Taiwan User Group. А здесь дает более подробное представление о доступных преобразованиях в DataSets.
Наборы данных MovieLens
В этом посте наборы данных из MovieLens будут использоваться для демонстрации примеров трансформации, доступных здесь. Набор данных включает следующие четыре файла CSV:
- movies.csv: информация о фильме, состоящая из трех полей, включая идентификатор фильма, название и жанры.
- rating.csv: 5-звездочный рейтинг доступных фильмов пользователями.
- tags.csv: действия с тегами с произвольным текстом.
- links.csv: идентификаторы, используемые для ссылки на другие источники данных фильма, такие как IMDB и TMDB.
Все вышеперечисленные файлы наборов данных имеют кодировку UTF-8.
Пример: получить набор жанров
Здесь файл movies.csv используется в качестве источника данных для ввода в пример программы для получения набора различных жанров.
Прежде всего, мы должны получить ExecutionEnvironment, а затем загрузить / создать исходный набор данных с помощью ExecutionEnvironment.
// obtain an ExecutionEnvironment val env = ExecutionEnvironment.getExecutionEnvironment // load the initial dataset val movieDataset = env.readTextFile("/path/to/movies.csv")
В приведенном выше фрагменте кода мы загружаем исходный набор данных через readTextFile, а не через функцию readCsvFile, хотя набор данных имеет формат CSV. Это потому, что файл набора данных записан как файл значений, разделенных запятыми, с одной строкой заголовка. А столбцы, содержащие запятые (`,`), экранируются двойными кавычками (`” `). Неожиданный DataSet, вероятно, может быть получен в результате применения функции readCsvFile. А именно, поле заголовка в файле movies.csv с большой вероятностью будет проанализировано как несколько полей, если оно содержит запятые.
Чтобы получить жанры с отдельными элементами, нам нужно применить ряд преобразований, как показано ниже.
movieDataset.map(line => { val fieldsArray = line.split(',') (fieldsArray.head, fieldsArray.last) }} .filter { tup => { var canBeLong = true try { tup._1.toLong } catch { case _: Throwable => canBeLong = false } canBeLong }}} .flatMap(tup => tup._2.split('|')) .distinct() .print()
Оператор map получает идентификатор movieId и жанры как строку типа из каждой строки, считанной из файла movies.csv. Затем применяется оператор filter, чтобы исключить строку заголовка в зависимости от того, можно ли преобразовать movieId в тип Long. Оператор flatMap пытается выделить жанры, к которым принадлежит каждый фильм. Затем мы можем использовать оператор independent для получения окончательных результатов, которые перечислены ниже.
Animation Film-Noir Thriller War (no genres listed) Action Adventure Children Comedy Crime Documentary Drama Fantasy Horror IMAX Musical Mystery Romance Sci-Fi Western
Богатые функции
Вместо написания функций лямбда-преобразования можно выделить сложную бизнес-логику в отдельную функцию и передать ее во Flink для улучшения читаемости программы. богатые функции могут оказаться большим подспорьем в достижении желаемого.
Все преобразования, которые принимают в качестве аргумента лямбда-функцию, могут вместо этого принимать в качестве аргумента расширенную функцию. Возьмем преобразование карты, разработанное в вышеупомянутом примере, мы можем написать
class GenreMapFunction extends RichMapFunction[String, (String, String)] { override def map(in: String): (String, String) = { val fieldArray = in.split(',') (fieldArray.head, fieldArray.last) } }
и передайте функцию в функцию карты, например
movieDataset.map(new GenreMapFunction())
В Apache Spark было предложено, что mapPartition имеет лучшую производительность, чем map при преобразовании RDD. Можно использовать mapPartition вместо оператора преобразования карты следующим образом.
class MovieMapPartFunction extends RichMapPartitionFunction[String, (String, String)] { override def mapPartition( values: Iterable[String], out: Collector[(String, String)]): Unit = { val scalaValues = values.asScala scalaValues.foreach { value: String => val fields = value.split(',') out.collect((fields.head, fields.last)) } } }
Затем мы можем передать функцию функции mapPartition, например
movieDataset.mapPartition(new MovieMapPartFunction())
Точно так же мы можем написать функцию расширенного фильтра, а не лямбда-функцию, как показано ниже.
class GenreFilterFunction extends RichFilterFunction[(String, String)] { override def filter(in: (String, String)): Boolean = { var canBeLong = true try { in._1.toLong } catch { case _: Throwable => canBeLong = false } canBeLong }}} }
И передайте функцию функции фильтра, например
movieDataset.map(new GenreMapFunction()) .filter(new GenreFilterFunction())