Операторы преобразования также играют важную роль при обработке данных в Apache Flink и Apache Sprak. Для большинства источников данных они необходимы нам для получения желаемых целевых наборов данных. Flink поддерживает множество встроенных операторов преобразования, таких как map, flatMap, filter и т. Д. Для пользователей. Этот пост направлен на демонстрацию использования встроенных операторов преобразования в Apache Flink. Краткое введение в эти операторы можно найти в Руководстве по программированию DataSet API. Там также есть соответствующие документы на китайском языке, предоставленные Apache Flink Taiwan User Group. А здесь дает более подробное представление о доступных преобразованиях в DataSets.

Наборы данных MovieLens

В этом посте наборы данных из MovieLens будут использоваться для демонстрации примеров трансформации, доступных здесь. Набор данных включает следующие четыре файла CSV:

  • movies.csv: информация о фильме, состоящая из трех полей, включая идентификатор фильма, название и жанры.
  • rating.csv: 5-звездочный рейтинг доступных фильмов пользователями.
  • tags.csv: действия с тегами с произвольным текстом.
  • links.csv: идентификаторы, используемые для ссылки на другие источники данных фильма, такие как IMDB и TMDB.

Все вышеперечисленные файлы наборов данных имеют кодировку UTF-8.

Пример: получить набор жанров

Здесь файл movies.csv используется в качестве источника данных для ввода в пример программы для получения набора различных жанров.

Прежде всего, мы должны получить ExecutionEnvironment, а затем загрузить / создать исходный набор данных с помощью ExecutionEnvironment.

// obtain an ExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
// load the initial dataset
val movieDataset = env.readTextFile("/path/to/movies.csv")

В приведенном выше фрагменте кода мы загружаем исходный набор данных через readTextFile, а не через функцию readCsvFile, хотя набор данных имеет формат CSV. Это потому, что файл набора данных записан как файл значений, разделенных запятыми, с одной строкой заголовка. А столбцы, содержащие запятые (`,`), экранируются двойными кавычками (`” `). Неожиданный DataSet, вероятно, может быть получен в результате применения функции readCsvFile. А именно, поле заголовка в файле movies.csv с большой вероятностью будет проанализировано как несколько полей, если оно содержит запятые.

Чтобы получить жанры с отдельными элементами, нам нужно применить ряд преобразований, как показано ниже.

movieDataset.map(line => {
    val fieldsArray = line.split(',')
    (fieldsArray.head, fieldsArray.last) }}
  .filter { tup => {
    var canBeLong = true
    try {
      tup._1.toLong
    }
    catch {
      case _: Throwable => canBeLong = false
    }
    canBeLong }}}
  .flatMap(tup => tup._2.split('|'))
  .distinct()
  .print()

Оператор map получает идентификатор movieId и жанры как строку типа из каждой строки, считанной из файла movies.csv. Затем применяется оператор filter, чтобы исключить строку заголовка в зависимости от того, можно ли преобразовать movieId в тип Long. Оператор flatMap пытается выделить жанры, к которым принадлежит каждый фильм. Затем мы можем использовать оператор independent для получения окончательных результатов, которые перечислены ниже.

Animation
Film-Noir
Thriller
War
(no genres listed)
Action
Adventure
Children
Comedy
Crime
Documentary
Drama
Fantasy
Horror
IMAX
Musical
Mystery
Romance
Sci-Fi
Western

Богатые функции

Вместо написания функций лямбда-преобразования можно выделить сложную бизнес-логику в отдельную функцию и передать ее во Flink для улучшения читаемости программы. богатые функции могут оказаться большим подспорьем в достижении желаемого.

Все преобразования, которые принимают в качестве аргумента лямбда-функцию, могут вместо этого принимать в качестве аргумента расширенную функцию. Возьмем преобразование карты, разработанное в вышеупомянутом примере, мы можем написать

class GenreMapFunction extends RichMapFunction[String, (String, String)] {
  override def map(in: String): (String, String) = {
    val fieldArray = in.split(',')
    (fieldArray.head, fieldArray.last)
  }
}

и передайте функцию в функцию карты, например

movieDataset.map(new GenreMapFunction())

В Apache Spark было предложено, что mapPartition имеет лучшую производительность, чем map при преобразовании RDD. Можно использовать mapPartition вместо оператора преобразования карты следующим образом.

class MovieMapPartFunction extends RichMapPartitionFunction[String, (String, String)] {
  override def mapPartition(
      values: Iterable[String],
      out: Collector[(String, String)]): Unit = {
    val scalaValues = values.asScala
    scalaValues.foreach {
      value: String =>
      val fields = value.split(',')
      out.collect((fields.head, fields.last))
    }
  }
}

Затем мы можем передать функцию функции mapPartition, например

movieDataset.mapPartition(new MovieMapPartFunction())

Точно так же мы можем написать функцию расширенного фильтра, а не лямбда-функцию, как показано ниже.

class GenreFilterFunction extends RichFilterFunction[(String, String)] {
  override def filter(in: (String, String)): Boolean = {
    var canBeLong = true
    try {
      in._1.toLong
    }
    catch {
      case _: Throwable => canBeLong = false
    }
    canBeLong
  }}}
}

И передайте функцию функции фильтра, например

movieDataset.map(new GenreMapFunction())
    .filter(new GenreFilterFunction())