Как отфильтровать многострочные данные JSON от попадания в таблицу AWS Hive

У меня есть правило AWS IoT, которое отправляет входящий JSON в Kinesis Firehose.

Все данные JSON из моей публикации IoT находятся в одной строке, например:

{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}

Раздел «Тест» IoT в пользовательском интерфейсе администратора позволяет публиковать сообщение, по умолчанию используется следующее (обратите внимание на формат многострочного JSON):

{
  "message": "Hello from AWS IoT console"
}

Я транслирую Firehose на S3, который затем преобразуется EMR в столбчатый формат, который в конечном итоге будет использоваться Athena.

Проблема в том, что во время преобразования в формат столбцов Hive (в частности, JSON SerDe) не может обрабатывать объект JSON, который занимает более одной строки. Это приведет к увеличению конверсии, а не к преобразованию хороших однострочных записей JSON.

Мой вопрос:

  • Как настроить FireHose для игнорирования многострочного JSON?
  • Если это невозможно, как сообщить Hive об удалении новых строк перед загрузкой в ​​таблицу или, по крайней мере, перехватить исключения и попытаться продолжить?

Я уже пытаюсь игнорировать искаженный JSON при определении таблицы Hive:

DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
 count int,     
 dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';

Вот мой полный HQL, который выполняет преобразование:

--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
    count int,
    dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';

ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';

DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
      count int,
      dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");

INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;

person rynop    schedule 09.03.2017    source источник


Ответы (1)


Что ж, стандартный JSON SerDe, используемый в EMR и Athena, не может работать с многострочными записями json. Каждая запись JSON должна быть в одной строке.

В многострочном JSON я вижу две проблемы с точки зрения Hive / Hadoop и даже с точки зрения Presto (используется в Athean).

  • Учитывая файл, очевидно, что Hive / Hadoop и serde JSON не смогут распознать конец и начало записи json, чтобы вернуть ее объектное представление.
  • При наличии нескольких файлов многострочные файлы JSON не разделяются, как обычные файлы JSON с разделителями / n.

Чтобы обойти эту проблему с конца EMR / Athena, вам нужно будет написать свой собственный SerDe на основе вашей структуры данных и перехватить исключения и т. Д.

Как настроить FireHose на игнорирование многострочного JSON?

Firehose не может игнорировать определенный формат. Он примет все, что помещается в его API (PutRecord или PutRecordBatch) в качестве большого двоичного объекта данных, и отправит его по назначению.

http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

Тем не менее, AWS Firehose предлагает преобразование данных с помощью AWS Lambda, где вы можете использовать функции Lambda для преобразования ваших данных, входящих в Firehose, и передачи преобразованных данных по назначению. Таким образом, вы можете использовать эту функцию, чтобы заранее распознать и сгладить многострочный JSON. Вы также можете отбросить записи, если они не отформатированы должным образом и т. Д. Вам нужно будет изучить, как IOT отправляет многострочные данные json в пожарный шланг (например, построчно и т. Д.), Чтобы написать свою собственную функцию.

https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/.

Если это невозможно, как сообщить Hive об удалении новых строк перед загрузкой в ​​таблицу или, по крайней мере, перехватить исключения и попытаться продолжить?

Если у вас все еще есть многострочный JSON в месте назначения пожарного шланга, поскольку у вас есть EMR в вашем ETL, вы можете использовать его вычисление вместо Lambda для выравнивания JSON. Эта функция на Spark также может помочь вам в этом. https://issues.apache.org/jira/browse/SPARK-18352

Затем вы можете загрузить эти данные в столбчатый формат, чтобы Афина могла с ними работать.

person jc mannem    schedule 10.05.2017