Как реализовать автоинкремент в Spark SQL (PySpark)

Мне нужно реализовать столбец автоматического увеличения в моей таблице Spark SQL, как я могу это сделать. Пожалуйста, направь меня. я использую писпарк 2.0

Спасибо, Калян.


person Kalyan    schedule 25.10.2016    source источник
comment
проверьте этот stackoverflow.com/questions/31955309/   -  person Arunakiran Nulu    schedule 25.10.2016
comment
@MRSrinivas спасибо за ваш подробный ответ, я попробую, недавно я попытался импортировать monotonically_increasing_id из pyspark.sql.functions для решения проблемы, которая сработала. Он дает идентификаторы для каждой индексации строки от 0, большое спасибо   -  person Kalyan    schedule 15.11.2016


Ответы (1)


Я бы написал/повторно использовал Hive udf с сохранением состояния и зарегистрировался в pySpark, поскольку Spark SQL имеет хорошую поддержку Hive.

проверьте эту строку @UDFType(deterministic = false, stateful = true) в приведенном ниже коде, чтобы убедиться, что это UDF с отслеживанием состояния.

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
  private LongWritable result = new LongWritable();

  public UDFRowSequence() {
    result.set(0);
  }

  public LongWritable evaluate() {
    result.set(result.get() + 1);
    return result;
  }
}

// End UDFRowSequence.java

Теперь создайте банку и добавьте местоположение, когда запустится pyspark.

$ pyspark --jars your_jar_name.jar

Тогда зарегистрируйтесь в sqlContext.

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

Теперь используйте row_seq() в запросе выбора

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

Проект использования пользовательских функций Hive в pySpark

person mrsrinivas    schedule 25.10.2016
comment
Я построил банку, как вы указали, а также создал временные функции. Теперь я создал таблицу sqlContext.sql("Create table abc(id int, name string)") и sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'John'") и sqlContext.sql("INSERT into TABLE abc SELECT row_seq(), 'Tim'"). Когда я делаю оператор select *, я получаю как iD, так и 1 вместо 1 и 2. - person ; 25.05.2017
comment
Устанавливаете ли вы stateful = true внутри тега @UDFType в своем коде? - person mrsrinivas; 26.05.2017
comment
Мне нужно что-то подобное, но вопрос в том, будет ли он масштабироваться с данными в 200 миллионов. На самом деле я хочу разбить большой файл, содержащий 200 миллионов строк, в файлы меньшего размера, содержащие точные 10 тыс. строк, содержащих файл. Я думал добавить автоинкрементное число для каждой строки и читать в пакетном режиме с помощью вот такого (id > 10 001 и id ‹ 20 000). Будет ли это работать в таком масштабе, пожалуйста, предложите. - person Hrishikesh Mishra; 03.03.2018
comment
возможно ли сделать этот UDF в python? и зарегистрировать его также в sqlContext? - person dim_user; 06.07.2019