Закладка задания AWS Glue создает дубликаты для файлов csv

Мы получаем 1 CSV-файл каждый день в корзине s3 от нашего поставщика в 11 утра. Я конвертирую этот файл в формат паркета с помощью Клея в 11:30.

Я включил закладку задания, чтобы не обрабатывать уже обработанные файлы. Тем не менее, я вижу, что некоторые файлы обрабатываются повторно, что создает дубликаты.

Я прочитал эти вопросы и ответы AWS Glue Bookmark создает дубликаты для ПАРКЕТА и Описание закладок для работы AWS Glue

Они дали хорошее представление о закладках вакансий, но до сих пор не решают проблему.

В документации AWS указано, что он поддерживает файлы CSV для создания закладок документации AWS .

Интересно, поможет ли мне кто-нибудь понять, в чем может быть проблема, и, если возможно, решение :)

Редактировать:

Вставьте сюда образец кода по просьбе Прабхакара.

staging_database_name = "my-glue-db"
s3_target_path = "s3://mybucket/mydata/"


"""
 'date_index': date location in the file name
 'date_only': only date column is inserted
 'date_format': format of date
 'path': sub folder name in master bucket
"""

#fouo classified files
tables_spec = {
'sample_table': {'path': 'sample_table/load_date=','pkey': 'mykey', 'orderkey':'myorderkey'}
}

spark_conf = SparkConf().setAll([
  ("spark.hadoop.fs.s3.enableServerSideEncryption", "true"),
  ("spark.hadoop.fs.s3.serverSideEncryption.kms.keyId", kms_key_id)
])
sc = SparkContext(conf=spark_conf)

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

for table_name, spec in tables_spec.items():
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database=database_name,
                                                                table_name=table_name,
                                                                transformation_ctx='datasource0')

    resolvechoice2 = ResolveChoice.apply(frame=datasource0, choice="make_struct", transformation_ctx='resolvechoice2')

    # Create spark data frame with input_file_name column
    delta_df = resolvechoice2.toDF().withColumn('ingest_datetime', lit(str(ingest_datetime)))

    date_dyf = DynamicFrame.fromDF(delta_df, glueContext, "date_dyf")
    master_folder_path1 = os.path.join(s3_target_path, spec['path']).replace('\\', '/')

    master_folder_path=master_folder_path1+load_date
    datasink4 = glueContext.write_dynamic_frame.from_options(frame=date_dyf,
                                                            connection_type='s3',
                                                            connection_options={"path": master_folder_path},
                                                            format='parquet', transformation_ctx='datasink4')
job.commit()

person Ash    schedule 24.06.2019    source источник
comment
В вашем коде есть Job.init() и Job.commit()?   -  person Yuriy Bondaruk    schedule 24.06.2019
comment
Да, в моем коде есть job.init (args ['JOB_NAME'], args) и job.commit ().   -  person Ash    schedule 25.06.2019
comment
Можете ли вы подтвердить, что вы установили transform_ctx для всех преобразований Glue, как указано в docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html, а также поделитесь здесь своим скриптом   -  person Prabhakar Reddy    schedule 25.06.2019
comment
Код простой, конвертирую файлы csv в паркет. Я приложил образец кода к своему вопросу   -  person Ash    schedule 25.06.2019


Ответы (1)


Поговорила с инженером AWS Support, и она упомянула, что может воспроизвести проблему, и обратилась к технической группе Glue для решения.

Тем не менее, я не мог дождаться, когда они исправят ошибку, и выбрал другой подход.

Решение:

  1. Отключить закладку Glue
  2. После того, как задание Glue преобразует файл csv в Parquet, я перемещаю файл csv в другое место в корзине S3.
person Ash    schedule 13.10.2019