Spark DataFrame / Dataset Найдите наиболее частое значение для каждого ключа Эффективный способ

Проблема: у меня проблема с отображением наиболее распространенного значения ключа в искре (с использованием scala). Я сделал это с помощью RDD, но не знаю, как это сделать эффективно с DF / DS (sparksql)

набор данных похож на

key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a

После искрового преобразования и доступа на выходе должен быть каждый ключ со своим общим значением

Вывод

key1 = valueb
key2 = valuec
key3 = valuea

Пробовали до сих пор:

RDD

Я попытался сопоставить и уменьшить группу (key,value),count в RDD, и это создает логику, но я не могу перевести это в sparksql (DataFrame / Dataset) (так как я хочу минимальное перемешивание по сети)

Вот мой код для RDD

 val data = List(

"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"

)

val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)

val lineRDD = sc.parallelize(data)

val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}

val flatPairsRDD = pairedRDD.flatMap {
  (key, val) => ((key, val), 1)
}

val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)




val resultsRDD = SumRDD.map{
  case ((key, val), count) => (key, (val,count))
 }.groupByKey.map{
  case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}

resultsRDD.collect().foreach(println)

DataFrame, использование окон: я пытаюсь с Window.partitionBy("key", "value") агрегировать count over the window. и thn sorting и agg() соответственно


person A.B    schedule 14.11.2017    source источник
comment
вам нужно будет использовать оконную функцию после группы по ключу, значения со счетчиком, сортировки по счетчику и получения первой ранжированной строки. Вы можете проверить это stackoverflow.com/questions/33878370/   -  person eliasah    schedule 14.11.2017
comment
@eliasah, спасибо, ищу   -  person A.B    schedule 14.11.2017
comment
Извините неверная ссылка! Я его обновил.   -  person eliasah    schedule 14.11.2017
comment
Я разговариваю по телефону, поэтому не могу написать ответ   -  person eliasah    schedule 14.11.2017
comment
@eliasah без проблем, напишите, пожалуйста, когда это возможно   -  person A.B    schedule 14.11.2017
comment
@eliasah, пожалуйста, опубликуйте свой ответ, если он еще больше оптимизирует работу   -  person A.B    schedule 16.11.2017


Ответы (2)


Согласно тому, что я понял из вашего вопроса, вот что вы можете сделать

Сначала вам нужно прочитать данные и преобразовать их в dataframe

val df = sc.textFile("path to the data file")   //reading file line by line
  .map(line => line.split("="))                 // splitting each line by =
  .map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created
  .toDF("key", "value")                        //rdd converted to dataframe which required import sqlContext.implicits._

который был бы

+----+-------+
|key |value  |
+----+-------+
|key1|value_a|
|key1|value_b|
|key1|value_b|
|key2|value_a|
|key2|value_c|
|key2|value_c|
|key3|value_a|
+----+-------+

Следующим шагом будет подсчет повторения идентичных значений для каждого ключа и выбор значения, которое чаще всего повторяется для каждого ключа, что можно сделать с помощью функции Window и aggregations, как показано ниже.

import org.apache.spark.sql.expressions._                   //import Window library
def windowSpec = Window.partitionBy("key", "value")         //defining a window frame for the aggregation
import org.apache.spark.sql.functions._                     //importing inbuilt functions
df.withColumn("count", count("value").over(windowSpec))     // counting repeatition of value for each group of key, value and assigning that value to new column called as count
  .orderBy($"count".desc)                                   // order dataframe with count in descending order
  .groupBy("key")                                           // group by key
  .agg(first("value").as("value"))                          //taking the first row of each key with count column as the highest

таким образом, окончательный результат должен быть равен

+----+-------+
|key |value  |
+----+-------+
|key3|value_a|
|key1|value_b|
|key2|value_c|
+----+-------+ 
person Ramesh Maharjan    schedule 14.11.2017
comment
Спасибо за ответ, скоро протестирую - person A.B; 14.11.2017
comment
прокомментировал столько, сколько мог. - person Ramesh Maharjan; 14.11.2017
comment
@RameshMaharjan Спасибо за ответ. Было любопытно, какая разница в производительности при использовании groupBy в этом ответе по сравнению с функцией Window, как вы думаете? - person jack; 18.01.2021

А как насчет использования groupBy?

val maxFreq= udf((values: List[Int]) => {
  values.groupBy(identity).mapValues(_.size).maxBy(_._2)._1
})

df.groupBy("key")
  .agg(collect_list("value") as "valueList")
  .withColumn("mostFrequentValue", maxFreq(col("valueList")))
person jack    schedule 17.01.2021