Как вывести столбец Spark ArrayType в .csv для Postgres

Я пытаюсь получить кадр данных искры в Aurora RDS Postgres.

Допустим, DF имеет 2 столбца и выглядит следующим образом: |ID | МояАрр >|

Мой текущий конвейер состоит из записи кадра данных в S3 в виде .csv, а затем вставки этого .csv в Aurora RDS Postgres через COPY.

Проблема, с которой я сталкиваюсь, заключается в том, что мой фрейм данных содержит столбец ArrayType (строк).

Вопросы:

  • Есть ли лучший подход, который полностью пропускает S3? или подход, который использует что-то вроде посредника в формате .parquet? (похоже, что postgres не поддерживает простой способ массовой вставки файлов паркета, как я приземлился на текущий подход)
  • Предполагая, что ответ на вышесказанное отрицательный, я знаю, что могу использовать collect_ws() для объединения массива, но формат вывода выглядит следующим образом: "A,B,C" когда мне нужно, чтобы он выглядел так "{A,B,C}" (это форматирование массива csv, понятное Postgres) . Я мог бы использовать UDF, но я ограничен работой в PySpark, поэтому я бы предпочел избегать UDF, поскольку этот файл на самом деле довольно большой.

person nciao    schedule 23.09.2019    source источник
comment
вы ограничены авророй? вы можете записать в parquet или json, чтобы сохранить схему (поскольку csv не поддерживает типы array), а затем использовать athenards для запроса файлов, хранящихся в s3   -  person thePurplePython    schedule 25.09.2019
comment
да, к сожалению, я ограничен Авророй. Кроме того, те массивы, которые я собираю, могут быть довольно большими (до 100 тыс. элементов), и их запись в S3 (в виде csv или паркета) занимает до 80% общего времени выполнения приложения spark.   -  person nciao    schedule 25.09.2019
comment
ваши данные представляют собой строковые байты, поэтому имеет смысл, почему   -  person thePurplePython    schedule 25.09.2019
comment
Я новичок в искрах, не сразу понятно, почему строковые данные особенно плохи с точки зрения производительности. Есть удобная ссылка или быстрое объяснение?   -  person nciao    schedule 25.09.2019
comment
это не spark вещь; основой является java с jvm ... разные типы данных соответствуют разным размерам байтов ... spark на самом деле пытается улучшить это с помощью проекта кодировщика tungsten   -  person thePurplePython    schedule 26.09.2019


Ответы (1)


Ах, хорошо - отвечая на часть 2 моего собственного вопроса. Отредактирую, если кто-нибудь ответит на первую часть.

Я могу просто использовать regexp_replace() для вставки '{' и '}' после вызова collect_ws()

Что-то вроде этого:

        myDF \
        .withColumn('MyArr', regexp_replace('MyArr', '\A', '{')) \
        .withColumn('MyArr', regexp_replace('MyArr', '\Z', '}'))

РЕДАКТИРОВАТЬ: при тестировании этот подход не подходит для начинающих, потому что он значительно замедляет время выполнения (массив потенциально может быть огромным, порядка десятков тысяч GUID).

person nciao    schedule 23.09.2019