извлекать данные из таблицы куста в искру и выполнять соединение на RDD

У меня есть две таблицы в улье/импале. Я хочу получить данные из таблицы в spark как rdds и выполнить, скажем, операцию соединения.

Я не хочу напрямую передавать запрос на соединение в моем контексте куста. Это всего лишь пример. У меня есть больше вариантов использования, которые невозможны для стандартного HiveQL. Как получить все строки, получить доступ к столбцам и выполнить преобразование.

Предположим, у меня есть два rdd:

val table1 =  hiveContext.hql("select * from tem1")

val table2 =  hiveContext.hql("select * from tem2")

Я хочу выполнить соединение rdds в столбце с именем «account_id».

В идеале я хочу сделать что-то подобное, используя rdds с помощью искровой оболочки.

select * from tem1 join tem2 on tem1.account_id=tem2.account_id; 

person user1189851    schedule 06.11.2014    source источник


Ответы (4)


Я не уверен, что понял вопрос, но в качестве альтернативы вы можете использовать API для присоединения к DataFrames, поэтому вы можете программно решить многие вещи (например, функция join может быть передана в качестве параметра методу, который применяет пользовательское преобразование) .

Для вашего примера это будет так:

val table1 =  hiveContext.sql("select * from tem1")
val table2 =  hiveContext.sql("select * from tem2")
val common_attributes = Seq("account_id")
val joined = table1.join(table2, common_attributes)

В DataFrame API доступно множество общих преобразований: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

Ваше здоровье

person Daniel de Paula    schedule 03.05.2016

Таким образом, мы могли бы зарегистрировать table1 и table2 как временные таблицы, а затем выполнить объединение этих временных таблиц.

table1.registerTempTable("t1")
table2.registerTempTable("t2")
table3 = hiveContext.hql("select * from t1 join t2 on t1.account_id=t2.account_id")
person Holden    schedule 06.11.2014
comment
Привет, Холден, спасибо, но я уже упоминал, что не хочу делать это таким образом. Это всего лишь простой пример. У меня есть вариант использования, когда у меня есть более сложные запросы. Я хочу иметь возможность создавать rdd из набора результатов и выполнять операции соединения и другие операции. - person user1189851; 07.11.2014
comment
Ах, извините, пользователь 1189851, я думал, вы хотите избежать объединения исходных таблиц улья. В приведенном выше фрагменте кода я указал, что table1 и table2 могут быть любыми SchemaRDD (и обратите внимание, что любой из запросов, которые мы пишем, возвращает нам SchemaRDD), какими вы хотите, чтобы table1 и table2 были? Вы хотите, чтобы они были из источников SQL, отличных от Spark? - person Holden; 07.11.2014
comment
поэтому у меня есть случай, когда мой первый rdd является набором результатов запроса. а второй - набор результатов другого запроса. val rdd1 = hiveContext.hql(выберите * из таблицы1) и val rdd2. = hiveContext.hql (выберите * из таблицы2). Я хочу выполнить, скажем, объединение этих двух rdd по общему атрибуту с именем account_id. Идея в том, что мне не нужно соединение внутри контекста улья, я должен сделать это с помощью преобразований. - person user1189851; 07.11.2014

table1 и table2 имеют тип DataFrame. Их можно преобразовать в rdd, используя:

lazy val table1_rdd = table1.rdd
lazy val table2_rdd = table2.rdd

Это должно быть трюком. На этих rdd вы можете использовать любую операцию rdd.

См. также: https://issues.apache.org/jira/browse/SPARK-6608 и https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

person Blaubaer    schedule 19.06.2015

Вы можете напрямую выбрать нужный столбец из следующего кода:

val table1 =  hiveContext.hql("select account_id from tem1")
val table2 =  hiveContext.hql("select account_id from tem2")
val joinedTable = table1.join(table2) 
person BadBoy777    schedule 01.02.2017