Spring Integration Адаптер Kinesis и группы потребителей

У меня есть потребительское приложение kinesis, разработанное с использованием spring-integration-aws версии 1.1.0.RELEASE.

В своих тестах я запускаю два экземпляра этого приложения в одной группе потребителей и использую поток с двумя осколками. В своих тестах я понял, что KinesisMessageDrivenChannelAdapter будет распространять сообщения тремя способами:

  1. Все сообщения доставляются одному потребителю
  2. сообщения, рассылаемые обоим потребителям (неравномерно)
  3. Оба потребителя получили одинаковые сообщения

Со стороны производителя сообщения равномерно распределяются между двумя шардами. Я хотел бы знать, как адаптер kinesis распределяет сообщения среди потребителей и, если поддерживается, как я могу добиться равномерного распределения среди потребителей.

Спасибо

ОБНОВЛЕНИЕ (конфигурация адаптера)

@Bean
  public KinesisMessageDrivenChannelAdapter kinesisInboundChannelAdapter(
      AmazonKinesis amazonKinesis) {
    String[] streamNames = this.consumerClientProperties.getKinesis().getStreamNames();
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(new QueueChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setCheckpointMode(CheckpointMode.record);
    adapter.setStartTimeout(10000);
    adapter.setConsumerGroup(consumerClientProperties.getName());
    adapter.setListenerMode(ListenerMode.record);
    adapter.setDescribeStreamRetries(1);
    return adapter;
  }

  @Bean
  public DynamoDbMetadataStore dynamoDbMetaDataStore() {
    DynamoDbMetadataStore dynamoDbMetaDataStore = new DynamoDbMetadataStore(amazonDynamoDB(),
        consumerClientProperties.getName());
    return dynamoDbMetaDataStore;
  }

person sansari    schedule 30.08.2018    source источник


Ответы (1)


Всем рекомендуется перейти на последнюю версию Spring Integration AWS 2.0: https://spring.io/blog/2018/08/21/spring-integration-for-aws-2-0-ga-and-spring-cloud-stream-kinesis-binder-1-0-ga

На потребительском уровне Kinesis было внесено множество исправлений, и теперь у нас есть выбор лидера, чтобы не подписываться на один и тот же сегмент более одного раза.

Идея состоит в том, чтобы иметь строгий порядок при обработке записей, поэтому только один поток на кластер должен иметь доступ к одному сегменту. Однако этот поток может обрабатывать несколько шардов.

В любом случае, если вы используете два экземпляра приложения, вам необходимо ввести MetadataStore на основе общих данных, например DynamoDbMetadataStore.

person Artem Bilan    schedule 31.08.2018
comment
Какая отличная новость! Я обновлю свой проект этой версией, чтобы увидеть результаты. Спасибо! - person sansari; 31.08.2018
comment
Поэтому я обновил AWS Spring Integration до новой версии, а также связал Kinesis. Я использую два экземпляра с одинаковым именем группы потребителей, установленным в KinesisAdapter. Теперь оба экземпляра получают все сообщения от обоих шардов. Есть ли какая-нибудь новая конфигурация, которую я должен сделать? Я обновляю вопрос своей конфигурацией. - person sansari; 31.08.2018
comment
Правильно, вам не хватает также LockRegistry для KinesisMessageDrivenChannelAdapter: github.com/spring-projects/ - person Artem Bilan; 31.08.2018
comment
Итак, я создал DynamoDBLockRegistry, внедрил его в LockRegistryLeaderInitiator и запустил инициатор лидера. Выбор лидера работает нормально во время отработки отказа, но все сообщения поступают только на экземпляр лидера. - person sansari; 31.08.2018
comment
Я сказал вам ввести DynamoDBLockRegistry в KinesisMessageDrivenChannelAdapter. Я не говорил о LockRegistryLeaderInitiator, но это решение тоже должно работать. И ваши результаты верны. Честно говоря, в текущем алгоритме нет гарантии, что шарды будут распределяться равномерно. - person Artem Bilan; 31.08.2018
comment
Правильно, я также добавил LockRegistry в KinesisAdapter. Я просто хочу знать, есть ли случайное распределение между экземплярами или в моем сценарии я всегда должен ожидать, что сообщения будут доставлены экземпляру лидера? - person sansari; 31.08.2018
comment
Что ж, есть выбор лидера для каждого осколка, и мы действительно можем закончить, что одно и то же приложение обрабатывает все осколки, если второе работает достаточно медленно, чтобы вовремя подобрать осколок. - person Artem Bilan; 31.08.2018
comment
Верно! В этом случае мы вообще не добьемся масштабируемости, особенно при отработке отказа. Все шарды подхватит только одно приложение. Есть ли план решить эту проблему в следующих выпусках? Если нет, где я могу посмотреть код, чтобы внести свой вклад в решение этой проблемы. Без этого мы не сможем масштабировать потребление кинезиса. Спасибо! - person sansari; 04.09.2018
comment
Вы можете назначать определенные сегменты различным потребителям. Нет, нет такого аргумента, что все шарды будут назначены одному экземпляру. Более того, когда этот экземпляр исчезнет, ​​другие смогут подобрать брошенные осколки. Код находится здесь: github.com/spring-projects/spring-integration-aws. Ваш вклад приветствуется! Ваше беспокойство действительно, и оно действительно не раз приходило мне в голову. Только проблема в том, что у меня нет времени хорошенько об этом подумать. - person Artem Bilan; 04.09.2018
comment
Это верно, но, согласно вашему предыдущему комментарию, самый быстрый экземпляр займет все осколки, и здесь мы говорим о миллисекундах. Если бы я мог вручную назначать осколки в коде в качестве обходного пути, было бы здорово, пока я не смогу реализовать обработку осколков между экземплярами. - person sansari; 04.09.2018
comment
Да, это правда. Нам нужно реализовать что-то вроде интереса шардов с помощью одного мастера и управлять распределением шардов оттуда. Что-то вроде ребалансировки Apache Kafka. - person Artem Bilan; 04.09.2018