Преобразование данных Kinesis firehose с использованием Java

При использовании функции Java Lambda для преобразования пожарного шланга данных кинезиса возникает ошибка, указанная ниже. Ниже мой преобразованный JSON выглядит как

{
"records": [
    {
        "recordId": "49586022990098427206724983301551059982279766660054253570000000",
        "result": "Ok",
        "data": "ZXlKMGFXTnJaWEpmYzNsdFltOXNJam9pVkVWVFZEY2lMQ0FpYzJWamRHOXlJam9pU0VWQlRGUklRMEZTUlNJc0lDSmphR0Z1WjJVaQ0KT2kwd0xqQTFMQ0FpY0hKcFkyVWlPamcwTGpVeGZRbz0="
    }
] 
}

ошибка в консоли кинезиса похожа на

Недопустимая структура вывода: проверьте свою функцию и убедитесь, что обработанные записи содержат действительный статус результата Dropped, Ok или ProcessingFailed

У кого-нибудь есть представление об этом, я не смог найти пример кода с использованием Java для преобразования данных kinesis

https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

В этом документе говорится о структуре вывода


person Java Programmer    schedule 06.08.2018    source источник
comment
У меня никогда не работала эта штука, мне пришлось сменить язык на Python. Там нормально работало. Если у кого-то это работает на Java, дайте мне знать   -  person Java Programmer    schedule 27.08.2018


Ответы (1)


Я только что закончил бороться с этим в scala (совместимом с java). Ключ заключается в использовании возвращаемого типа: com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse

import java.nio.ByteBuffer

import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse._
import com.amazonaws.services.lambda.runtime.events.{KinesisAnalyticsInputPreprocessingResponse, KinesisFirehoseEvent}
import com.amazonaws.services.lambda.runtime.{Context, LambdaLogger, RequestHandler}

import scala.collection.JavaConversions._
import scala.language.implicitConversions

class Handler extends RequestHandler[KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse] {

  override def handleRequest(in: KinesisFirehoseEvent, context: Context): KinesisAnalyticsInputPreprocessingResponse = {

    val logger: LambdaLogger = context.getLogger
    val records = in.getRecords
    val tranformed = records.flatMap(record => {

      try {
        val changed = record.getData.array()
        //do some sort of transform
        val rec = new Record(record.getRecordId, Result.Ok, ByteBuffer.wrap(changed))
        Some(rec)
      } catch {
        case e: Exception => {
          logger.log(e.toString)
          Some(new Record(record.getRecordId, Result.Dropped, record.getData))
        }
      }
    })

    val response = new KinesisAnalyticsInputPreprocessingResponse()
    response.setRecords(tranformed.toList)
    response
  }
}
person Chris Ellsworth    schedule 19.10.2018
comment
Так почему бы также не вернуть KinesisFirehoseEvent? - person Nacho; 31.03.2020