Вопросы по теме 'spring-kafka'

Как вручную зафиксировать смещения в исходном модуле Kafka при получении подтверждения от модуля приемника Kafka в Spring XD?
В потоке XD сообщения потребляются из темы Kafka через исходный модуль, а затем отправляются в принимающий модуль Kafka. Причина разработки пользовательских модулей источника и приемника Kafka заключается в том, что я хочу обновлять смещения из...
1295 просмотров

Производитель Kafka выдает исключение TimeoutException: Batch Expired
Я тестирую приложение Spring Cloud Stream для твиттера, запустил контейнер докеров со следующими свойствами среды, связанными с Kafka, KAFKA_ADVERTISED_HOST_NAME=<ip> advertised.host.name=<ip>:9092...
3103 просмотров

Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: индекс строки вне допустимого диапазона в
У меня есть приложение Spring Boot ( app0 ), которое использует Spring Cloud Stream Kafka для чтения из темы. Есть два других приложения ( app1 , app2 ), которые создают сообщения по этой теме. app1 публикует сообщения с помощью интерфейса...
1004 просмотров

Как мне узнать, была ли моя запись зафиксирована вручную с помощью Spring Kafka
Я хотел бы знать, как работает фиксация, когда AckMode установлен на MANUAL в spring kafka. Ниже показано свойство, которое я установил в KafkaConfig containerProperties.setAckMode (AbstractMessageListenerContainer.AckMode.MANUAL); Код...
1046 просмотров

Обработка ошибок с помощью @KafkaListener
Я использую spring-kafka со следующей конфигурацией: package com.danigu.fancypants.infrastructure; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Data; import org.apache.kafka.clients.consumer.ConsumerConfig; import...
10095 просмотров
schedule 03.06.2022

исключение spring kafka thorws InstanceAlreadyExistsException после установки параллелизма › 1
Я использую spring-kafka, все работает, если я не устанавливаю параллелизм ConcurrentKafkaListenerContainerFactory, когда я устанавливаю для него число больше 1, я получаю исключение: javax.management.InstanceAlreadyExistsException:...
5584 просмотров
schedule 13.02.2023

Клиент Spring Kafka не может получать сообщения из темы Kafka Broker с включенным протоколом Kerberos
Недавно у нас был керберизован кластер Kafka, и у нас начались проблемы с чтением сообщений из темы на брокере. Мы используем Spring kafka 1.1.2.RELEASE и клиент kafka 0.10.0.1. Я внес в проект следующие изменения после ознакомления с...
810 просмотров
schedule 09.03.2022

Пример реализации шаблона Spring Kafka для смещения поиска, подтверждения
Я новичок в spring-kafka-template . Я попробовал некоторые основные вещи в нем, и они работают нормально. Но я пытаюсь реализовать некоторые концепции, упомянутые в Документы Spring , такие как: Поиск смещения Благодарность слушателей...
3421 просмотров

Acknowledgement.acknowledge () выбрасывает исключение в spring-kafka @KafkaListener
Когда я устанавливаю для enable.auto.commit значение false и пытаюсь вручную зафиксировать смещение с использованием аннотации spring-kafka @KafkaListener, я получаю org.springframework.kafka.listener.ListenerExecutionFailedException: метод...
11726 просмотров
schedule 02.02.2022

Сообщения Spring Cloud Stream Kafka Producer
Я хочу настроить производитель spring-cloud-stream-kafka с весенней загрузкой. Производитель работает, и я могу получать сообщения от брокера kafka, но сообщения также содержат некоторую информацию заголовка, например следующую: contentType...
2157 просмотров

o.apache.kafka.clients.NetworkClient - брокер начальной загрузки ‹hostname›: 9092 отключено
Я пытаюсь получать сообщения из темы Kafka, используя Spring Kafka consumer, но вижу ошибку ниже. Это отлично работает, когда я использую сообщения из темы kafka, настроенной на моем локальном компьютере -...
3593 просмотров
schedule 15.11.2022

Как использовать метод Spring Kafka Acknowledgement.acknowledge () для фиксации вручную
Я впервые использую Spring Kafka, и я не могу использовать метод Acknowledgement.acknowledge () для ручной фиксации в моем пользовательском коде, как указано здесь https://docs.spring.io/spring-kafka/reference/html/_reference.html#committing-offsets...
32863 просмотров

Конфигурация динамического имени темы для Spring Kafka при использовании адаптеров Kafka для интеграции Spring?
У меня есть потоки интеграции Spring, которые мне нужно снова использовать. @Bean public IntegrationFlow sendToKafkaFlowRequest(@Value("${kafka.document-consume-topic}") String topic,...
412 просмотров
schedule 19.05.2023

Spring Kafka Consumer Retry
Я использую потребитель Spring Kafka, который извлекает сообщения из темы и сохраняет их в базе данных. Если выполняется условие сбоя, например, база данных недоступна, предоставляет ли потребительская библиотека kafka механизм для повторной попытки?...
16269 просмотров

TimeoutException продюсера Kafka: истекает срок действия 1 записи (ов)
Я использую Kafka с Spring-boot: Класс Kafka Producer : @Service public class MyKafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; private static Logger LOGGER =...
40498 просмотров

Конфигурация spring-cloud-stream-kafka: instanceCount и instanceIndex
Запуск программы spring-boot 1.5.7.RELEASE с использованием spring-cloud Dalston.SR4 . Сам работал с облачным потоком документацией , особенно в отношении kafka- использование связующего, но мне все еще неясны некоторые аспекты конфигурации:...
493 просмотров

Группа потребителей / слушателей Spring Kafka
в чем разница в указании группы у потребителя spring.kafka.consumer.group-id против указания на @KafkaListener? @KafkaListener(topic="test", group = "test-grp")
6260 просмотров
schedule 26.07.2022

Как создать одновременный прослушиватель сообщений для темы Kafka с 1 разделом
Использование Spring Kafka org.springframework.kafka.listener.ConcurrentMessageListenerContainer создает несколько прослушивателей на основе свойств контейнера и количества разделов в теме. И javadoc говорит: «Сообщения из одного раздела будут...
764 просмотров

Spring Cloud Stream MessageChannel send () всегда возвращает true
Я использую облачный поток Spring, и я хотел бы сохранить сообщения и повторить попытку опубликовать их в теме, когда сервер Kafka ушел, но метод send () MessageChannel всегда возвращает true, даже если сервер Kafka / Zookeeper остановлен....
1403 просмотров

Как добавить обработчик ошибок для производителя при использовании Spring Kafka
Как добавить обработчик ошибок для производителя при использовании Spring Kafka? Я знаю, как добавить обработчик ошибок для потребителя, но не уверен насчет производителя.
2504 просмотров
schedule 13.02.2024