Используйте команду Redshift Copy для слияния

У меня есть процесс, который перебирает ввод и выдает данные в AWS Firehose, которые я настроил для загрузки в созданную мной таблицу красного смещения. Одна из проблем заключается в том, что иногда строки могут дублироваться, потому что процессу необходимо переоценить данные. Что-то типа:

Event_date, event_id, event_cost
2015-06-25, 123, 3
2015-06-25, 123, 4

http://docs.aws.amazon.com/redshift/latest/dg/t_updating-inserting-using-staging-tables-.html

Глядя туда, я хочу заменить старую строку новым значением, например:

insert into event_table_staging  
select event_date,event_id, event_cost from <s3 location>;

delete from event_table  
using event_table_staging  
where event_table.event_id = event_table_staging.event_id;

insert into target 
select * from event_table_staging;

delete from event_table_staging  
select * from event_table_staging;

Можно ли сделать что-то вроде:

Redshift columns: event_date,event_id,cost
copy event_table from <s3> 
(update event_table 
select c_source.event_date,c_source.event_id,c_source.cost from <s3 source> as c_source join event_table on c_source.event_id = event_table.event_id) 
CSV


copy event_table from <s3> 
(insert into event_table 
select c_source.event_date,c_source.event_id,c_source.cost from event_table left outer join<s3 source> as c_source join on c_source.event_id = event_table.event_id where c_source.event_id is NULL) 
CSV

person Niru    schedule 26.02.2016    source источник


Ответы (2)


Вы не можете выполнить слияние непосредственно из COPY.

Однако ваш первоначальный подход может быть заключен в транзакцию с использованием временной таблицы для подготовки данных загрузки для лучшей производительности.

BEGIN
;
CREATE TEMP TABLE event_table_staging (
     event_date  TIMESTAMP  NULL
    ,event_id    BIGINT     NULL
    ,event_cost  INTEGER    NULL )
DISTSTYLE KEY
DISTKEY (event_id)
SORTKEY (event_id)
;
COPY event_table_staging  
FROM <s3 location>
COMPUDATE ON
;
UPDATE event_table  
SET    event_date = new.event_date
      ,event_cost = new.event_cost
FROM        event_table         AS trg
INNER JOIN  event_table_staging AS new
        ON  trg.event_id = new.event_id
WHERE COALESCE(trg.event_date,0) <> COALESCE(new.event_date,0)
  AND COALESCE(trg.event_cost,0) <> COALESCE(new.event_cost,0)
;
INSERT INTO event_table 
SELECT  event_date
       ,event_id  
       ,event_cost
FROM        event_table_staging AS new
LEFT JOIN   event_table         AS trg
       ON   trg.event_id = new.event_id
WHERE trg.event_id IS NULL
;
COMMIT
;

Этот подход на самом деле работает удивительно хорошо, пока вы используете транзакцию, а общий объем обновлений относительно невелик (однозначное число%). Единственный нюанс — вашу цель нужно будет периодически VACUUMовать — нам достаточно одного раза в месяц.

Мы делаем это ежечасно для нескольких таблиц в диапазоне сотен миллионов строк, т. е. сотни миллионов строк объединяются в сотни миллионов строк. Пользовательские запросы к объединенным таблицам по-прежнему выполняются хорошо.

person Joe Harris    schedule 27.02.2016
comment
Я не могу сделать это за один раз, потому что на вход поступает Firehose Stream. Мне не нужны почасовые пулы, просто EOD, для которого я собираюсь настроить задание Lambda или Datapipeline (все еще решаю, нужно ли мне выполнение на основе событий) - person Niru; 01.03.2016
comment
Что важно для производительности? Почему важно использовать транзакцию и временную таблицу? Если промежуточная таблица прилично велика, я предполагаю, что временная таблица не будет находиться в памяти (или распределена в памяти?) и будет находиться на диске, как и постоянная таблица. Я использую инструмент AWS DMS для синхронизации Postgres с Redshift, и подавляющее большинство использования в кластере RS — это задания обновления DMS, которые просто пытаются не отставать. Я отмечаю, что он не создает правильно типизированные таблицы этапов, все это широкая строка, а слияние/upsert выполняется как длинный оператор case. Часто отстает до смерти - person Davos; 15.02.2019
comment
AWS DMS выполняет слияние из промежуточной таблицы, используя этот шаблон для каждого столбца: "colname"= CASE WHEN "public"."awsdms_changesLONGUUIDHERE"."col1" IS NULL THEN "public"."target_table"."colname" WHEN "public"."awsdms_changesLONGUUIDHERE"."col1" = '<att_null>' THEN NULL ELSE CAST ( "public"."awsdms_changesLONGUUIDHERE"."col1" as INT4) END Обратите внимание на магическое значение <att_null> для удаленных данных и приведение к правильному типу данных из столбцов широкой строки ( varchar(65535) ) в промежуточной таблице. Это не быстро. Однако он делает отдельные вставки, используя прямую копию S3, что быстро. - person Davos; 15.02.2019

Redshift оптимизирован для обработки огромных объемов данных экономичным способом, и вам необходимо изменить некоторые представления о данных и базе данных, которые у вас есть из других БД.

Основная идея заключается в том, что вы не должны обновлять данные в Redshift. Вы должны рассматривать эти данные в Redshift как «Журнал». Вы можете использовать такие функции, как INSERT или UPDATE, но они резко ограничат объем данных, которые вы можете обработать.

Вы можете обрабатывать дубликаты несколькими способами:

  • Вы можете предотвратить создание дубликатов, управляя некоторыми таблицами поиска в памяти (например, в Redis на ElastiCache).) всех идентификаторов, которые вы обрабатываете, и игнорировать запись, если вы уже обработали ее.

  • Вы можете хранить дубликаты внутри Redshift и обрабатывать эти записи с помощью функций WINDOW. это займет только одну из записей (LAST_VALUE, например ).

  • Вы можете иметь необработанные события в Redshift и выполнять агрегацию в запросах к БД вместо предварительной обработки. Этот режим также дает гибкость в изменении способа агрегирования данных. Redshift может быть очень быстрым с этим агрегированием, и нет необходимости в предварительном агрегировании.

Если вы все еще хотите иметь «чистые» и агрегированные данные в Redshift, вы можете ВЫГРУЗИТЕ эти данные с помощью SQL-запроса с правильным агрегированием или функцией WINDOW, удалите старую таблицу и скопируйте данные обратно в Redshift.

person Guy    schedule 28.02.2016
comment
Спасибо, я добавил столбец Identity, чтобы различать event_id. Я создам отдельную таблицу, в которой будут храниться только результаты последних обновлений. - person Niru; 01.03.2016
comment
Я действительно не согласен с тем, что UPDATE/INSERT/DELETE в одностороннем порядке плохи в Redshift. Эти команды полностью поддерживаются и хорошо документированы. Мы объединяем многие миллионы строк каждый час в Redshift, используя эти команды без проблем. - person Joe Harris; 01.03.2016
comment
Также совершенно не нужно выполнять UNLOAD и перезагружать в ту же базу данных. INSERT INTO и CREATE TABLE … AS существуют только для этой цели. - person Joe Harris; 01.03.2016
comment
Я не собираюсь делать выгрузку в этом случае или функции окна. Я планирую использовать ElastiCache для потоковой передачи. Мое решение будет передаваться в промежуточную таблицу со столбцом autoid. Поскольку я отключил event_date, я собираюсь создать отдельный шаг загрузки, который выполняет фактическое слияние EOD, которое будет получать последние значения и очищать исторические данные. - person Niru; 01.03.2016
comment
Я только что понял, что мое решение ближе к вашему ... я не принимаю и принимаю ваше? - person Niru; 01.03.2016