Как обрабатывать DynamoDB Stream в потоковом приложении Spark

Я хочу использовать DynamoDB Stream из приложения Spark Streaming.

Потоковая передача Spark использует KCL для чтения из Kinesis. Существует библиотека, позволяющая KCL читать из потока DynamoDB: Dynamodb-streams-kinesis-adapter.

Но можно ли воткнуть эту библиотеку в искру? Кто-нибудь это сделал?

Я использую Spark 2.1.0.

Мой план резервного копирования заключается в том, чтобы другое приложение считывало из потока DynamoDB в поток Kinesis.

Спасибо


person Raphaël Douyère    schedule 16.04.2017    source источник
comment
Что вы пробовали до сих пор?   -  person Maximilien Belinga    schedule 17.04.2017
comment
Я смог использовать поток DynamoDB, настроив: KinesisUtils, KinesisInputDStream и KinesisReceiver. Настоящее изменение касается KinesisReceiver, где я использую com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorker.   -  person Raphaël Douyère    schedule 18.04.2017


Ответы (1)


Способ сделать это - реализовать KinesisInputDStream для использования воркера, предоставленного dynamodb-streams-kinesis-adapter официальными рекомендациями предложите что-то вроде этого:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

С точки зрения Spark, это реализовано в модуле kinesis-asl в KinesisInputDStream.scala.

Я пробовал это для Spark 2.4.0. Вот мое репо. Требуется небольшая доработка, но работа выполняется

https://github.com/ravi72munde/spark-dynamo-stream-asl

После изменения KinesisInputDStream мы можем использовать его, как показано ниже. val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()

person ravi72munde    schedule 02.12.2018