29 мая 2021 г.

Томас

Использование Kinesis Data Firehose (которое я также буду называть потоком доставки) и Lambda — отличный способ обработки потоковых данных, а поскольку обе службы являются бессерверными, нет серверов, которыми нужно управлять или платить за них, пока они не используются. Я использовал эту комбинацию несколько раз, чтобы маскировать или очищать журналы, поскольку они передаются из нескольких служб. Одна из замечательных особенностей Kinesis заключается в том, что другие сервисы AWS напрямую интегрируются с ним, например CloudWatch. Если нет прямой интеграции, то данные могут быть отправлены напрямую с помощью запроса PUT.

Kinesis упрощает преобразование данных после их поступления в поток доставки благодаря интеграции с Lambda. Поступают записи, Lambda может их преобразовать, после чего записи достигают конечного пункта назначения. Этим конечным пунктом назначения может быть что-то вроде S3, Elastisearch или Splunk. Есть и другие встроенные интеграции. Пока я создавал свой шаблон CloudFormation для этого, я выбрал S3, так как легко создать корзину, и есть множество других замечательных вещей, которые можно сделать с данными, находящимися в корзине S3.

Мой базовый шаблон доступен на GitHub в репозитории AWS CloudFormation Reference вместе с несколькими другими шаблонами, которые я создал в качестве кратких ориентиров и строительных блоков.

Начиная с функции Lambda, не было никаких сложных моментов в построении ее со стороны инфраструктуры. Для роли IAM я просто использовал управляемую политику с ARN arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole. В нем было все, что мне нужно, чтобы начать работу в течение нескольких минут. Разрешение Lambda все еще немного сбивает меня с толку. Принципалу events.amazonaws.com требуется разрешение на выполнение действия lambda:InvokeFunction, что сначала не имело для меня смысла, поскольку Kinesis — это то, что запускает Lambda. По крайней мере, так я думал за кулисами.

Затем появился сам Firehose и его роль IAM. Свойства Kinesis имеет несколько конфигураций назначения на выбор, и каждый поток доставки получает только один. Возможные варианты: Elasticsearch, S3, Redshift, Splunk, Kinesis Stream (отличный от Kinesis Data Firehose) или общая конечная точка HTTP. Также стоит отметить, что S3 имеет два доступных свойства конфигурации назначения: S3DestinationConfiguration и ExtendedS3DestinationConfiguration. Из того, что я могу сказать, расширенный пункт назначения допускает дополнительную настройку, такую ​​как обработка Lambda, тогда как обычная конфигурация назначения предназначена для простой переадресации, чтобы упростить ее. Я решил использовать ExtendedS3DestinationConfiguration, потому что хотел использовать интеграцию Lambda. Интеграция Lambda выполняется в свойстве ProcessingConfiguration объекта ExtendedS3DestinationConfiguration, что в конечном итоге выглядит примерно так, как показано в следующем фрагменте.

...
ExtendedS3DestinationConfiguration:
  ...
  ProcessingConfiguration:
    Enabled: 'true'
    Processors:
      - Parameters:
          - ParameterName: LambdaArn
            ParameterValue: !GetAtt LambdaFunction.Arn
        Type: Lambda

Самой сложной частью, из-за которой я застрял в работе над этим шаблоном дольше всего, была роль IAM для Data Firehose. После того, как я разобрался со своей проблемой, я нашел страницу в документации AWS о разных разрешениях, необходимых для различных интеграций, которая помогла бы мне, если бы я знал об этом заранее. На чем я застрял, так это на разрешениях Lambda для Firehose. Сначала у меня было только что-то похожее на следующее.

- Effect: Allow
  Action:
    - 'lambda:InvokeFunction'
    - 'lambda:GetFunctionConfiguration'
  Resource:
    - !Sub "${LambdaFunction.Arn}"

Это приводит к тому, что Firehose записывает в мою корзину S3 по пути с ошибкой отправки. То, что я получил вместо чистых записей, было чем-то вроде следующего сообщения.

{
  "attemptsMade": 4,
  "arrivalTimestamp": 1622242573374,
  "errorCode": "Lambda.InvokeAccessDenied",
  "errorMessage": "Access was denied. Ensure that the access policy allows access to the Lambda function.",
  "attemptEndingTimestamp": 1622242649990,
  "rawData": "eyJ0aWNrZXJfc3ltYm9sIjoiQU1aTiIsInNlY3RvciI6IlRFQ0hOT0xPR1kiLCJjaGFuZ2UiOi02LjU1LCJwcmljZSI6NzMzLjQxfQ==",
  "lambdaArn": "REDACTED"
}

После того, как я слишком долго смотрел на это и задавался вопросом, что я сделал неправильно, я, наконец, наткнулся на что-то, в котором упоминалось о необходимости подстановочного знака в Resource для документа политики роли IAM. Что, наконец, помогло мне, так это следующая корректировка предыдущего утверждения.

- Effect: Allow
  Action:
    - 'lambda:InvokeFunction'
    - 'lambda:GetFunctionConfiguration'
  Resource:
    - !Sub "${LambdaFunction.Arn}*" # NOTE: there is an * after the Lambda's ARN

Убедитесь, что после ARN лямбды стоит *. После этого все мои записи начали проходить через конвейер данных!

Теперь идет открытая часть этой интеграции, код, который выполняет функция Lambda. Я уже писал о своем предыдущем опыте написания кода для обработки журналов, исходящих из CloudWatch и получающих данные в Elasticsearch. Этот код определенно был более сложной версией того, что я написал на этот раз. Для этого шаблона я хотел, чтобы код был простым. Я хотел декодировать записи Base64 из Firehose, распечатать содержимое и вернуть записи обратно в Firehose нетронутыми. Цель состояла в том, чтобы просто убедиться, что все работает так, как задумано. Вот код, который я написал в Node/Javascript:

exports.handler = (event, context, callback) => {
  /* Process the list of records and transform them */
  const output = event.records.map((record) => {
    const plaintextData = Buffer.from(record.data, 'base64').toString('ascii');
    console.log(plaintextData);
    return {
      recordId: record.recordId,
      result: 'Ok',
      data: record.data,
    };
  });
  console.log(`Processing completed.  Successful records ${output.length}.`);
  callback(null, { records: output });
};

Этот код мало что делает, но это хорошая отправная точка. Отсюда данные могут быть преобразованы, как это необходимо. Значения могут быть добавлены, значения могут быть отредактированы, сигналы тревоги могут быть активированы на основе содержимого. Как только мы подключим Lambda, мы окажемся на Диком Западе, где все идет, так что получайте удовольствие!

Первоначально опубликовано на https://thomasstep.com 29 мая 2021 г.