исключение spring kafka thorws InstanceAlreadyExistsException после установки параллелизма › 1

Я использую spring-kafka, все работает, если я не устанавливаю параллелизм ConcurrentKafkaListenerContainerFactory, когда я устанавливаю для него число больше 1, я получаю исключение:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3

Моя конфигурация:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory<String, String>();

    factory.setConcurrency(kafkaConfig.getConcurrency());

    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

характеристики:

kafka.enable-auto-commit=false

kafka.client-id=client-1

kafka.concurrency=2

person rellocs wood    schedule 25.04.2017    source источник


Ответы (1)


Я открыл для этого проблему на github. Установка разных client.id для каждого потока в настоящее время не поддерживается.

В качестве обходного пути вы можете запустить отдельный KafkaMessageListenerContainer для каждого (это то, что ConcurrentMessageListenerContainer делает внутри).

ИЗМЕНИТЬ

Хотя это и не идеально, вы можете опустить client.id, и клиент kafka сгенерирует по одному для каждого (consumer-1, consumer-2 и т. д.)

person Gary Russell    schedule 25.04.2017
comment
Я воспроизвел его; Кстати, опускание client.id работает, если это вариант для вас. - person Gary Russell; 25.04.2017
comment
Теперь client.id уникален для каждого потребителя в контейнере за счет добавления -<n> (начиная с версии 1.0.6). - person Gary Russell; 10.05.2019