Процесс курирования с библиотеками Delta Lake (без Databricks)

Я использую

  • AWS DMS для извлечения данных из Oracle
  • Он приземляется в S3 Raw Bucket.
  • Используя AWS Glue, я хочу написать код pyspark БЕЗ использования продукта databricks для объединения данных CDC с начальной загрузкой.

Какие библиотеки мне нужно было бы импортировать специально в контексте искры для создания таблиц Delta?

Я добавил delta-core_2.12-0.7.0.jar в Glue Dependent Path в разделе Конфигурация безопасности, библиотеки скриптов и параметры задания (необязательно). Я получаю сообщение об ошибке ниже ошибки --------

Файл script_2020-11-08-19-29-39.py, строка 54, в fullload_str_metrics_df = spark.read.parquet ('s3: //rawbucket/.../fullload/.../STR_METRICS/LOAD00000001.parquet' ) Файл /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip/pyspark/sql/readwriter.py, строка 291, в parquet Файл app_48686_86_0001/2186000_03_03_03/240/8000/_server_64000/count_486_86000/container_1 py4j-0.10.4-src.zip/py4j/java_gateway.py, строка 1133, в файле вызова /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/pyspark.zip sql / utils.py, строка 63, в деко Файл /mnt/yarn/usercache/root/appcache/application_1604863378634_0002/container_1604863378634_0002_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py, строка 319. protocol.Py4JJavaError: произошла ошибка при вызове o74.parquet.


person PKad    schedule 08.11.2020    source источник


Ответы (1)


¿Пробовали ли вы приклеить закладки и Glue Crawlers?

Закладки могут отслеживать каталог S3 и обрабатывать только новые файлы. У него есть функция фиксации для фиксации новых смещений после успешного завершения процесса и функция отката для возврата к предыдущим состояниям смещений (например, для возврата перед выполнением с ID ‹ID выполнения задания›.

Вы можете включить закладку для задания AWS Glue в его конфигурации, и как только она будет включена в коде, вы можете включить закладку для определенного источника, передавая transform_ctx:

import sys
    
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_context = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

data_frame = glue_context.create_dynamic_frame.from_catalog(database = '<database>', table_name = '<table_name>', transformation_ctx = '<source_unique_id_for_this_job>').toDF()

some transformations...

sink = DynamicFrame.fromDF(data_frame, glue_context, "<df_name>")
    
glue_context.write_dynamic_frame.from_options(frame = sink , connection_type = "s3", connection_options = {"path": "<s3_path>"}, format = "parquet")

job.commit()

В этом примере мы используем AWS Glue Crawler для интерпретации входного каталога S3 как таблицы.

Я знаю, что это далеко от обработки в реальном времени, но каждое выполнение пакета обрабатывает только новые данные.

person Jordi Sabater Picañol    schedule 09.11.2020
comment
Спасибо за ваш ответ. Я пытаюсь добавить файлы CDC с флагами U, I и D (обновление, вставка и удаление) с использованием библиотек Dela Lake. Я получаю сообщение об ошибке. - person PKad; 09.11.2020