AWS Kinesis: определить, существует ли именованный поток

Моя цель — использовать API AWS Kinesis для создания потока Kinesis с определенным именем, если он еще не существует, а затем записать в него, был ли он там изначально или нет.

Это то, что я придумал до сих пор. Попытка создать поток. Если он завершается ошибкой с кодом 400 и возвращает идентификатор запроса, возможно, поток уже существует. Затем напишите в поток, чтобы убедиться, что он есть. В Го:

k := kinesis.New(session.New())
_, err := k.CreateStream(&kinesis.CreateStreamInput{
    ShardCount: aws.Int64(2),
    StreamName: aws.String("stream"),
})
if err != nil {
    if reqerr, ok := err.(awserr.RequestFailure); ok {
        if reqerr.RequestID() == "" {
            log.Fatal("request was not delivered as it has no ID",
                reqerr.Code(),
                reqerr.Message(),
            )
        }
        if reqerr.StatusCode() != 400 {
            log.Fatal("unexpected status code", reqerr.StatusCode())
        }
    } else {
        log.Fatal(err)
    }
}
// Code 400 + requestID does not necessarily mean that the stream exists
// So write to the stream to confirm it exists
_, err = k.PutRecord(&kinesis.PutRecordInput{
    Data:         []byte("Hello Kinesis"),
    PartitionKey: aws.String("partitionkey"),
    StreamName:   aws.String("stream"),
})
if err != nil {
    log.Fatal(err)
}

Приведенный выше подход кажется запутанным, и, что более важно, я не думаю, что он эффективно соответствует точной ошибке, которую я ожидаю. Выполнение сравнения строк в сообщении об ошибке тоже кажется плохим выбором, потому что это может легко измениться.

Мне интересно, есть ли более надежный и простой способ добиться этого? Перечисление всех доступных потоков для поиска является проблемой, поскольку это линейный поиск, включающий несколько запросов с новыми значениями ExclusiveStartStreamName.


person voutasaurus    schedule 25.02.2016    source источник


Ответы (1)


Опишите поток. Если потока нет, создайте поток и вращайтесь, ожидая, пока он станет активным.

Вы не сможете отправить поток сразу после его создания. Сначала он перейдет в СОЗДАНИЕ, а через некоторое время (секунды) в АКТИВНО.

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#DescribeStream-instance_method

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#CreateStream-instance_method

Вы также можете использовать ListStreams для быстрого просмотра состояния всех потоков:

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#ListStreams-instance_method

person Mircea    schedule 25.02.2016
comment
Хороший момент, если поток фактически создан с помощью create, он некоторое время находится в состоянии CREATING, прежде чем вы сможете записать в него. Вызов описательного потока намного лучше, чем вызов записи. - person voutasaurus; 25.02.2016
comment
Добавьте DescribeStream в свой ответ, и я приму его. Я все еще хотел бы лучший способ. Сейчас я буду использовать CreateStream -> Spin для DescribeStream (с тайм-аутом). - person voutasaurus; 25.02.2016
comment
добавлен DescribeStream + уточнение - person Mircea; 25.02.2016
comment
Эй, это выглядит полезным: WaitUntilStreamExists. Комментариев к функции нет, но, глядя на код, он говорит, что ожидает, пока поток перейдет в состояние READY. Избавляет от необходимости писать логику для вращения. godoc.org/github.com/aws/ aws-sdk-go/service/ - person voutasaurus; 29.02.2016
comment
К вашему сведению, из моего чтения кода функция WaitUntilStreamExists ждет 180 секунд, а затем истекает. - person voutasaurus; 29.02.2016