Как лучше всего собрать большой набор данных из spark rdd?

Я использую pyspark для обработки своих данных, и в самом конце мне нужно собрать данные из rdd, используя rdd.collect(). Однако у меня искра вылетает из-за проблемы с памятью. Я пробовал несколько способов, но не повезло. Сейчас я работаю со следующим кодом, обрабатываю небольшой фрагмент данных для каждого раздела:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter


for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row

Новый код, который я сейчас использую, не дает сбоев, а кажется, что он работает вечно.

Есть ли лучший способ собрать данные с большого rdd?


person JamesLi    schedule 21.05.2016    source источник
comment
Вместо того, чтобы запускать цикл for в формате списка RDD, почему бы вам не запустить вместо этого функцию карты?   -  person Saif Charaniya    schedule 22.05.2016
comment
На самом деле мне нужно собрать все данные в rdd и сохранить в большом массиве, а затем передать модуль машинного обучения.   -  person JamesLi    schedule 22.05.2016
comment
Возможно, модуль машинного обучения принимает итератор или ему действительно нужен массив? С помощью итератора вы можете избежать одновременной загрузки всех данных в память. Даже тогда я бы беспокоился о производительности, поскольку я предполагаю, что модуль машинного обучения будет потреблять данные за один поток.   -  person E.F.Walker    schedule 22.05.2016
comment
О каком алгоритме машинного обучения идет речь? Идея искры заключается в том, что вы запускаете ее распределенным образом.   -  person z-star    schedule 22.05.2016
comment
Я действительно удивлен, что это не встроено или, по крайней мере, не спросило больше. У меня есть огромный RDD, который я хочу добавить к файлу на диске, но мой главный узел не может уместить все это в памяти!   -  person sudo    schedule 18.04.2017


Ответы (2)


Попытка «собрать» огромный РДД проблематична. «Собрать» возвращает список, который подразумевает, что все содержимое RDD должно быть сохранено в памяти драйвера. Это проблема "выставок". Обычно требуется, чтобы приложение Spark могло обрабатывать наборы данных, размер которых значительно превышает размер памяти одного узла.

Допустим, RDD еле влезает в память, а "собрать" работает. Затем у нас есть еще один «победитель» — низкая производительность. В вашем коде собранный RDD обрабатывается в цикле: «для строки в myCollection». Этот цикл выполняется ровно одним ядром. Таким образом, вместо обработки данных с помощью RDD, чьи вычисления распределяются между всеми ядрами кластера, которых, вероятно, 100, если не 1000, --- вместо этого вся работа над всем набором данных помещается на задний план одного основной.

person E.F.Walker    schedule 21.05.2016

Я не знаю, лучший ли это способ, но это лучший способ, который я пробовал. Не знаю, лучше или хуже вашего. Та же идея, разбить его на куски, но вы можете быть более гибкими с размером куска.

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

Пример использования, когда я хочу добавить огромный RDD в файл CSV на диске, даже не заполняя список Python всем RDD:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
        c.writerows([row])
    f.close()

rdd_to_csv("~/test.csv", my_really_big_rdd)
person sudo    schedule 18.04.2017
comment
Я пытаюсь решить аналогичную проблему, и я нашел ваш код полезным! Благодарю вас! - person Nahuel Chaves; 29.05.2020
comment
Пожалуйста. Я помню, как использовал этот код для предыдущей работы, и он был очень надежным. - person sudo; 08.06.2020