Проблема при использовании Multiple KafkaProducerMessageHandler с интеграцией Spring

Это расширение сообщения MaprStream с проблемой Spring интеграции Kafka Producer

У меня возникают проблемы с несколькими KafkaProducerMessageHandler при попытке добиться свойства синхронизации при публикации сообщений в maprstream.

@Autowired
Qualifier("abcHandler.handler")
private KafkaProducerMessageHandler abcHandler;

@Autowired
Qualifier("xyzHandler.handler")
private KafkaProducerMessageHandler xyzHandler;

@PostConstruct
public void init() {
    this.abcHandler.setSync(true);
    this.xyzHandler.setSync(true);
}

Конфигурация бина:

<int:chain input-channel="inputToKafka"> 
    <int-kafka:outbound-channel-adapter 
            id="abcHandler" 
            kafka-template="template" 
            topic="${maprstream.stream.topicname}" > 
    </int-kafka:outbound-channel-adapter> 
</int:chain>

<int:chain input-channel="inputToKafka1"> 
    <int-kafka:outbound-channel-adapter 
            id="xyzHandler" 
            kafka-template="template1" 
            topic="${maprstream.stream.topicname1}" > 
    </int-kafka:outbound-channel-adapter> 
</int:chain>

Я получаю следующее исключение при попытке загрузить beans.

Возникла исключительная ситуация во время инициализации контекста - отмена попытки обновления: org.springframework.beans.factory.UnsatisfiedDependencyException: ошибка при создании bean-компонента с именем 'maprStreamProducerHandlerSync': неудовлетворенная зависимость, выраженная через поле 'abcHandler'; вложенное исключение - org.springframework.beans.factory.NoSuchBeanDefinitionException: нет подходящего bean-компонента типа org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler: ожидается как минимум 1 bean-компонент, который квалифицируется как кандидат autowire. Аннотации зависимостей: {@ org.springframework.beans.factory.annotation.Autowired (required = true), @ org.springframework.beans.factory.annotation.Qualifier (value = abcHandler.handler)}

Может кто-нибудь помочь мне в этом?


person kattoor    schedule 20.02.2018    source источник


Ответы (1)


Почему у вас есть эти отдельные компоненты в <chain/>s? Цепочки обычно используются для группировки нескольких элементов.

См. документацию о цепочках; прокрутите вниз до атрибута id '.

<int:chain id="chain1" input-channel="inputToKafka"> 
    <int-kafka:outbound-channel-adapter 
            id="abcHandler" 
            kafka-template="template" 
            topic="${maprstream.stream.topicname}" > 
    </int-kafka:outbound-channel-adapter> 
</int:chain>

Компоненты внутри цепочек получают составные имена bean-компонентов. в данном случае это будет chain1$child.abcHandler.handler. При автоматическом подключении вам нужно будет использовать это значение в @Qualifier, если у вас более одного адаптера.

person Gary Russell    schedule 20.02.2018
comment
Спасибо Гэри за обновление. Да, я внес следующие изменения ‹int-kafka: outbound-channel-adapter id = abcHandler channel = inputToKafka kafka-template = template topic = $ {maprstream.stream.topicname}› ‹/ int-kafka: outbound-channel- адаптер ›и все заработало .. !! - person kattoor; 20.02.2018