Как ускорить потребление очереди sqs, используя интеграцию spring

Я пытаюсь настроить поток интеграции для приема сообщений из очереди amazon sqs, и пока он работает нормально. Но я хотел бы определить количество сообщений в минуту или секунду. например 20 сообщений в минуту.

Вот определение моего компонента прослушивателя sql

    @Bean
    public MessageProducer mySqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);

        adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY);
        adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT);
        adapter.setMaxNumberOfMessages(prefetch);
        adapter.setOutputChannel(processMessageChannel());
        return adapter;
    }

Как видите, я устанавливаю максимальное количество сообщений для выборки за один опрос, но как установить задержку между опросами?

В обычной очереди jms я мог бы использовать JMS.inboundAdapter с помощью пользовательского опросчика, но кажется, что с помощью SqsMessageDrivenChannelAdapter я не могу установить какое-либо значение таймера опроса.

Может быть, я мог бы использовать MessageProducer, отличный от SqsMessageDrivenChannelAdapter, но какой именно?

Можно ли установить JMS.inboundAdapter с помощью sqs?


person Warley Noleto    schedule 20.10.2017    source источник
comment
Видно, что решение можно найти здесь для jms messagelistener с провайдером sqs"> stackoverflow.com/questions/29667321/. В этом случае этот вопрос можно считать дублированным, но дело в том, что я использую spring-integration. Я попытаюсь адаптировать представленное там решение, если оно сработает, я закрою это.   -  person Warley Noleto    schedule 20.10.2017


Ответы (1)


Spring Integration SqsMessageDrivenChannelAdapter является активным компонентом драйвера сообщений. Он основан на SimpleMessageListenerContainer из проекта Springh Cloud AWS, который имеет длительный цикл while() для вызова AmazonSQS.receiveMessage(). Логика в этом цикле не слишком сложна:

try {
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
    for (Message message : receiveMessageResult.getMessages()) {
        if (isQueueRunning()) {
            MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                 getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
        } else {
           messageBatchLatch.countDown();
        }
    }
    try {
         messageBatchLatch.await();
    } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
    }
} catch (Exception e) {

Как видите, мы создаем там messageBatchLatch и ждем его после завершения цикла. Каждое сообщение обрабатывается собственным SignalExecutingRunnable, который countDown() находится в конце MessageExecutor. Итак, то, что вы хотели бы сделать, может быть достигнуто с помощью искусственного Thread.sleep() в методе целевого сервиса, чтобы иметь больший интервал между опросами SQS.

Но я слышу вашу просьбу, и мы действительно должны добавить что-то вроде:

/**
 * The sleep interval in milliseconds used in the main loop between shards polling cycles.
 * Defaults to {@code 1000} minimum {@code 250}.
 * @param idleBetweenPolls the interval to sleep between shards polling cycles.
 */
public void setIdleBetweenPolls(int idleBetweenPolls) {
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
}

Я сделал это для KinesisMessageDrivenChannelAdapter, но здесь мы должны запросить Spring Cloud AWS, чтобы сделать это для SimpleMessageListenerContainer.

person Artem Bilan    schedule 20.10.2017