Spark: Как собрать большой объем данных без нехватки памяти

У меня есть следующая проблема:

Я делаю sql-запрос к набору файлов паркета в HDFS, а затем собираю, чтобы получить результат.

Проблема в том, что когда строк много, я получаю сообщение об ошибке нехватки памяти.

Этот запрос требует перетасовки, поэтому я не могу выполнять запрос для каждого файла.

Одним из решений может быть повторение значений столбца и сохранение результата на диске:

df = sql('original query goes here')
// data = collect(df) <- out of memory
createOrReplaceTempView(df, 't')
for each c in cities
    x = collect(sql("select * from t where city = c")
    append x to file

Насколько я знаю, это приведет к тому, что программа займет слишком много времени, потому что запрос будет выполняться для каждого города.

Как лучше всего это сделать?


person drumkey    schedule 08.05.2018    source источник
comment
Ну, 1) Неясно, какие значения памяти вы даете или что доступно на вашей машине 2) В идеале вы бы вообще не собирали. Вы храните все данные в кадре данных, фильтруете и преобразуете, а затем записываете обратно в HDFS.   -  person OneCricketeer    schedule 08.05.2018
comment
пожалуйста, запустите df.cache   -  person loneStar    schedule 08.05.2018


Ответы (3)


Как сказал @cricket_007, я бы не стал collect() ваши данные из Spark добавлять в файл в R. Кроме того, нет смысла перебирать список SparkR::distinct() городов, а затем выбирать все из этих таблиц только для того, чтобы добавить их в некоторый выходной набор данных. Единственный раз, когда вы захотите это сделать, — это если вы пытаетесь выполнить другую операцию в каждой группе на основе какой-то условной логики или применить операцию к каждой группе, используя функцию, которая НЕдоступна в SparkR.

Я думаю, вы пытаетесь получить фрейм данных (либо Spark, либо R) с наблюдениями, сгруппированными таким образом, чтобы, когда вы смотрите на них, все было красиво. Для этого добавьте предложение GROUP BY city в свой первый SQL-запрос. Оттуда просто запишите данные обратно в HDFS или другой выходной каталог. Из того, что я понимаю по вашему вопросу, возможно, поможет что-то вроде этого:

sdf <- SparkR::sql('SELECT SOME GREAT QUERY FROM TABLE GROUP BY city')

SparkR::write.parquet(sdf, path="path/to/desired/output/location", mode="append")

Это даст вам все ваши данные в одном файле, и они должны быть сгруппированы по city, что, я думаю, вы пытаетесь получить со своим вторым запросом в своем вопросе.

Вы можете подтвердить, что вывод - это то, что вы хотите, через:

newsdf<- SparkR::read.parquet(x="path/to/first/output/location/")
View(head(sdf, num=200))

Удачи, надеюсь это поможет.

person nate    schedule 17.05.2018

В случае, если у него заканчивается память, а это означает, что выходные данные действительно очень велики, поэтому вы можете записать результаты в какой-то сам файл, как паркетный файл.

Если вы хотите дополнительно выполнить какую-либо операцию с этими собранными данными, вы можете прочитать данные из этого файла.

Для больших наборов данных мы не должны использовать collect(), вместо этого вы можете использовать take(100) или take(some_integer), чтобы проверить правильность некоторых значений.

person Aditya Chopra    schedule 08.05.2018

Поскольку ваши данные огромны, сбор() больше невозможен. Таким образом, вы можете использовать стратегию выборки данных и обучения на основе выборки данных.

import numpy as np
arr = np.array(sdf.select("col_name").sample(False, 0.5, seed=42).collect())

Здесь вы выбираете 50% данных и только один столбец.

person der Fotik    schedule 28.05.2021