Как справиться с противодавлением при использовании Apache Camel и Kafka?

Я пытаюсь написать приложение, которое будет интегрироваться с Kafka с помощью Camel. (Версия - 3.4.2)

У меня есть подход, заимствованный из ответа на этот вопрос.

У меня есть маршрут, который прослушивает сообщения из темы Kafka. Обработка этого сообщения отделена от потребления с помощью простого исполнителя. Каждая обработка передается этому исполнителю как задание. Порядок сообщений не важен, и единственный важный фактор - это то, насколько быстро и эффективно сообщение может быть обработано. Я отключил автоматическую фиксацию и вручную фиксировал сообщения после того, как задачи были отправлены исполнителю. Потеря сообщений, которые в настоящее время обрабатываются (из-за сбоя / завершения работы), допустима, но те сообщения в Kafka, которые никогда не были отправлены для обработки, не должны быть потеряны (из-за фиксации смещения). Теперь к вопросам,

  1. Как я могу эффективно справиться с нагрузкой? Например, есть 1000 сообщений, но я могу параллельно обрабатывать только 100 за раз.

Сейчас у меня есть решение - заблокировать поток опроса потребителя и попытаться постоянно отправлять задание. Но приостановка голосования была бы гораздо лучшим подходом, но я не могу найти никакого способа добиться этого в Camel.

  1. Есть ли лучший способ (верблюжий способ) отделить обработку от потребления и справиться с противодавлением?

public static void main(String[] args) throws Exception {
        String consumerId = System.getProperty("consumerId", "1");
        ExecutorService executor = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>());
        LOGGER.info("Consumer {} starting....", consumerId);

        Main main = new Main();
        main.init();

        CamelContext context = main.getCamelContext();
        ComponentsBuilderFactory.kafka().brokers("localhost:9092").metadataMaxAgeMs(120000).groupId("consumer")
                .autoOffsetReset("earliest").autoCommitEnable(false).allowManualCommit(true).maxPollRecords(100)
                .register(context, "kafka");

        ConsumerBean bean = new ConsumerBean();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("kafka:test").process(exchange -> {
                    LOGGER.info("Consumer {} - Exhange is {}", consumerId, exchange.getIn().getHeaders());
                    processTask(exchange);
                    commitOffset(exchange);
                });
            }

            private void processTask(Exchange exchange) throws InterruptedException {
                try {
                    executor.submit(() -> bean.execute(exchange.getIn().getBody(String.class)));
                } catch (Exception e) {
                    LOGGER.error("Exception occured {}", e.getMessage());
                    Thread.sleep(1000);
                    processTask(exchange);
                }
            }

            private void commitOffset(Exchange exchange) {
                boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
                if (lastOne) {
                    KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
                            KafkaManualCommit.class);
                    if (manual != null) {
                        LOGGER.info("manually committing the offset for batch");
                        manual.commitSync();
                    }
                } else {
                    LOGGER.info("NOT time to commit the offset yet");
                }
            }
        });

        main.run();
    }

person Raghu    schedule 03.09.2020    source источник
comment
Возможно, вы хотите заменить processTask на seda и параметры blockWhenFull=true&concurrentConsumers=100&size=100   -  person Bedla    schedule 04.09.2020


Ответы (1)


Для этого можно использовать throttle EIP.

from("your uri here")
.throttle(maxRequestCount)
.timePeriodMillis(inTimePeriodMs)
.to(yourProcessorUri)
.end()

Ознакомьтесь с исходной документацией здесь.

person Anastasiia Smirnova    schedule 03.09.2020