Вопросы по теме 'kafka-consumer-api'
время ожидания сеансов потребителя kafka истекло
У нас есть приложение, в котором потребитель читает сообщение, а поток выполняет ряд действий, включая доступ к базе данных до того, как сообщение будет отправлено в другую тему. Время между получением и созданием сообщения в цепочке может занять...
12847 просмотров
schedule
03.10.2023
Высокоуровневый потребитель Kafka: могут ли разделы иметь несколько потоков, потребляющих его?
Могут ли сообщения из данного раздела когда-либо быть разделены на несколько потоков? Допустим, у меня есть один раздел и сотни процессов с сотней потоков в каждом - будут ли сообщения из моего единственного раздела передаваться только одному из этих...
3300 просмотров
schedule
24.02.2022
Потребитель конфигурации spring-integration-kafka для получения сообщения из указанного раздела
Я начал использовать spring-integration-kafka в своем проекте, и я могу создавать и использовать сообщения из Kafka. Но теперь я хочу создать сообщение для определенного раздела, а также использовать сообщение из определенного раздела.
Пример: я...
9658 просмотров
schedule
10.07.2022
Как получить размер (метаданные) сообщения без запроса на выборку в Kafka через SimpleConsumer?
Я использую SimpleConsumer и пытаюсь получить размер сообщения (в байтах), используя искру.
Я могу получить самое раннее и самое последнее смещение, используя запрос метаданных, но не знаю, как получить количество байтов в kafka (0.8.0).
Я...
151 просмотров
schedule
17.08.2022
Можно ли создать тему кафка с динамическим подсчетом разделов?
Я использую kafka для потоковой передачи событий посещений страниц пользователями веб-сайта в службу аналитики. Каждое событие будет содержать следующую информацию для потребителя:
ID пользователя
IP-адрес пользователя
Мне нужна очень...
13611 просмотров
schedule
28.02.2022
Получить последнее сообщение из сценария консоли потребителя kafka
Мы можем получить каждое сообщение от Kafka, выполнив:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Есть ли способ получить только последнее сообщение ?
РЕДАКТИРОВАТЬ:
Если вы просто хотите...
64106 просмотров
schedule
06.07.2022
Как заставить потребителя kafka читать с последнего использованного смещения, но не с начала
Я новичок в kafka и пытаюсь понять, есть ли способ читать сообщения с последнего использованного смещения, но не с начала.
Я пишу примерный случай, чтобы мое намерение не отклонялось.
Eg:
1) I produced 5 messages at 7:00 PM and console consumer...
24336 просмотров
schedule
02.05.2024
Как Kafka справляется с потребителем, который работает медленнее, чем другие потребители?
Допустим, у меня 20 разделов и пять рабочих. Каждому разделу назначается рабочий. Однако один рабочий работает медленнее, чем другие машины. Он все еще обрабатывается (то есть не медленный потребитель , описанный здесь ), но с частотой 60 % по...
4343 просмотров
schedule
02.07.2023
Понимание Kafka Consumer API для Java
Я хочу понять API приема Kafka. Я включил пример кода, который работает.
Почему в Kafka ConsumerStreamMap.get(topic) для одной темы есть список получателей KafkaStream‹>?
Текущий процесс, кажется, перебирает KafkaStream‹> List, а затем...
1368 просмотров
schedule
03.04.2024
Kafka Consumer не получает сообщения
Я новичок в Кафке. Я прочитал в Интернете множество инструкций по созданию Kafka Producer и Kafka Consumer. Я успешно сделал первый, который может отправлять сообщения в кластер Kafka. Однако последний я не закончил. Пожалуйста, помогите мне...
11551 просмотров
schedule
24.03.2022
Как анализировать данные json, которые поступают из темы kafka в классе схемы Storm?
Я получаю данные json из темы кафки. Как я могу применить синтаксический анализ json, чтобы получить все поля для всех объектов в классе схемы шторма, который использует метод десериализации, после чего я возвращаю значения в новые возвращаемые...
3408 просмотров
schedule
26.07.2022
Возможно ли и рекомендуется ли наличие многопоточного Kafka Consumer для каждого раздела, если да, какой-либо образец фрагмента?
Мы используем версию Kafka 0.9, и большое количество сообщений отправляется в определенный раздел в теме kafka. И таких разделов в этой теме несколько. У нас есть один потребитель, назначенный на раздел в рамках этой темы, и мы поддерживаем смещение...
3298 просмотров
schedule
26.02.2024
Есть ли в Kafka 0.9 способ перечислить смещение для всех потребителей в группе потребителей?
Я использую новый Consumer API Kafka 0.9.
Я позволяю Kafka позаботиться о компенсации для потребителей. У меня есть потребители, работающие на нескольких машинах и читающие из одной и той же темы.
Я пытаюсь выяснить следующее:...
1893 просмотров
schedule
24.12.2022
Как заблокировать запрос на вытягивание в потребителе kafka
Когда у брокера нет данных для извлечения, потребитель остается в замкнутом цикле до тех пор, пока данные не поступят. Поэтому я хочу найти способ сказать потребителю подождать, пока брокер не получит некоторые данные. Я использую java-клиент Kafka...
247 просмотров
schedule
17.01.2024
Flink + Kafka сбросить контрольную точку и смещение
Короче говоря, я хотел бы повторно запустить конвейер Flink для данных в Kafka с самого начала .
Флинк 0.10.2, Кафка 0.8.2.
У меня есть тема твитов в Kafka с удержанием 2 часа и конвейер в Flink, который считает твиты со скользящим окном 5...
3354 просмотров
schedule
17.07.2023
Могут ли несколько потребителей Kafka читать одно и то же сообщение из раздела
Мы планируем написать потребителя Kafka (java), который читает очередь Kafka для выполнения действия, которое находится в сообщении.
Поскольку потребители работают независимо, будет ли сообщение обрабатываться одновременно только одним...
55356 просмотров
schedule
30.05.2023
Как установить размер сообщений в Kafka?
Сейчас я использую Kafka 0.9.0.1. Согласно некоторым источникам, которые я нашел, способ установить размеры сообщений - это изменить следующие значения ключей в server.properties .
message.max.bytes
replica.fetch.max.bytes...
17500 просмотров
schedule
08.02.2022
Kafka Avro Consumer с проблемами декодера
Когда я попытался запустить Kafka Consumer с Avro над данными с моей соответствующей схемой, он вернул ошибка «AvroRuntimeException: неверные данные. Длина отрицательная: -40». Я вижу, что у других были похожие проблемы преобразование массива...
9176 просмотров
schedule
28.03.2023
Потребитель Kafka смещается за пределы диапазона без настроенной политики сброса для разделов
Я получаю исключение при запуске потребителя Kafka.
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: смещения вне диапазона без настроенной политики сброса для разделов{test-0=29898318}
Я использую Kafka версии 9.0.0 с Java 7.
45801 просмотров
schedule
28.02.2023
невозможно установить max.poll.records для потребителя kafka, где cons.poll по-прежнему возвращает все записи в разделе
Я создал приложение многопоточного потребителя для работы с различными разделами. Изучая различные блоги, я узнал о свойстве max.poll.records, чтобы получить контроль над набором записей из данной темы, раздела (чтобы он мог быстро выйти из цикла...
15291 просмотров
schedule
30.01.2023