Вопросы по теме 'kafka-consumer-api'

время ожидания сеансов потребителя kafka истекло
У нас есть приложение, в котором потребитель читает сообщение, а поток выполняет ряд действий, включая доступ к базе данных до того, как сообщение будет отправлено в другую тему. Время между получением и созданием сообщения в цепочке может занять...
12847 просмотров
schedule 03.10.2023

Высокоуровневый потребитель Kafka: могут ли разделы иметь несколько потоков, потребляющих его?
Могут ли сообщения из данного раздела когда-либо быть разделены на несколько потоков? Допустим, у меня есть один раздел и сотни процессов с сотней потоков в каждом - будут ли сообщения из моего единственного раздела передаваться только одному из этих...
3300 просмотров

Потребитель конфигурации spring-integration-kafka для получения сообщения из указанного раздела
Я начал использовать spring-integration-kafka в своем проекте, и я могу создавать и использовать сообщения из Kafka. Но теперь я хочу создать сообщение для определенного раздела, а также использовать сообщение из определенного раздела. Пример: я...
9658 просмотров

Как получить размер (метаданные) сообщения без запроса на выборку в Kafka через SimpleConsumer?
Я использую SimpleConsumer и пытаюсь получить размер сообщения (в байтах), используя искру. Я могу получить самое раннее и самое последнее смещение, используя запрос метаданных, но не знаю, как получить количество байтов в kafka (0.8.0). Я...
151 просмотров

Можно ли создать тему кафка с динамическим подсчетом разделов?
Я использую kafka для потоковой передачи событий посещений страниц пользователями веб-сайта в службу аналитики. Каждое событие будет содержать следующую информацию для потребителя: ID пользователя IP-адрес пользователя Мне нужна очень...
13611 просмотров

Получить последнее сообщение из сценария консоли потребителя kafka
Мы можем получить каждое сообщение от Kafka, выполнив: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Есть ли способ получить только последнее сообщение ? РЕДАКТИРОВАТЬ: Если вы просто хотите...
64106 просмотров
schedule 06.07.2022

Как заставить потребителя kafka читать с последнего использованного смещения, но не с начала
Я новичок в kafka и пытаюсь понять, есть ли способ читать сообщения с последнего использованного смещения, но не с начала. Я пишу примерный случай, чтобы мое намерение не отклонялось. Eg: 1) I produced 5 messages at 7:00 PM and console consumer...
24336 просмотров
schedule 02.05.2024

Как Kafka справляется с потребителем, который работает медленнее, чем другие потребители?
Допустим, у меня 20 разделов и пять рабочих. Каждому разделу назначается рабочий. Однако один рабочий работает медленнее, чем другие машины. Он все еще обрабатывается (то есть не медленный потребитель , описанный здесь ), но с частотой 60 % по...
4343 просмотров
schedule 02.07.2023

Понимание Kafka Consumer API для Java
Я хочу понять API приема Kafka. Я включил пример кода, который работает. Почему в Kafka ConsumerStreamMap.get(topic) для одной темы есть список получателей KafkaStream‹>? Текущий процесс, кажется, перебирает KafkaStream‹> List, а затем...
1368 просмотров
schedule 03.04.2024

Kafka Consumer не получает сообщения
Я новичок в Кафке. Я прочитал в Интернете множество инструкций по созданию Kafka Producer и Kafka Consumer. Я успешно сделал первый, который может отправлять сообщения в кластер Kafka. Однако последний я не закончил. Пожалуйста, помогите мне...
11551 просмотров
schedule 24.03.2022

Как анализировать данные json, которые поступают из темы kafka в классе схемы Storm?
Я получаю данные json из темы кафки. Как я могу применить синтаксический анализ json, чтобы получить все поля для всех объектов в классе схемы шторма, который использует метод десериализации, после чего я возвращаю значения в новые возвращаемые...
3408 просмотров

Возможно ли и рекомендуется ли наличие многопоточного Kafka Consumer для каждого раздела, если да, какой-либо образец фрагмента?
Мы используем версию Kafka 0.9, и большое количество сообщений отправляется в определенный раздел в теме kafka. И таких разделов в этой теме несколько. У нас есть один потребитель, назначенный на раздел в рамках этой темы, и мы поддерживаем смещение...
3298 просмотров
schedule 26.02.2024

Есть ли в Kafka 0.9 способ перечислить смещение для всех потребителей в группе потребителей?
Я использую новый Consumer API Kafka 0.9. Я позволяю Kafka позаботиться о компенсации для потребителей. У меня есть потребители, работающие на нескольких машинах и читающие из одной и той же темы. Я пытаюсь выяснить следующее:...
1893 просмотров
schedule 24.12.2022

Как заблокировать запрос на вытягивание в потребителе kafka
Когда у брокера нет данных для извлечения, потребитель остается в замкнутом цикле до тех пор, пока данные не поступят. Поэтому я хочу найти способ сказать потребителю подождать, пока брокер не получит некоторые данные. Я использую java-клиент Kafka...
247 просмотров
schedule 17.01.2024

Flink + Kafka сбросить контрольную точку и смещение
Короче говоря, я хотел бы повторно запустить конвейер Flink для данных в Kafka с самого начала . Флинк 0.10.2, Кафка 0.8.2. У меня есть тема твитов в Kafka с удержанием 2 часа и конвейер в Flink, который считает твиты со скользящим окном 5...
3354 просмотров

Могут ли несколько потребителей Kafka читать одно и то же сообщение из раздела
Мы планируем написать потребителя Kafka (java), который читает очередь Kafka для выполнения действия, которое находится в сообщении. Поскольку потребители работают независимо, будет ли сообщение обрабатываться одновременно только одним...
55356 просмотров
schedule 30.05.2023

Как установить размер сообщений в Kafka?
Сейчас я использую Kafka 0.9.0.1. Согласно некоторым источникам, которые я нашел, способ установить размеры сообщений - это изменить следующие значения ключей в server.properties . message.max.bytes replica.fetch.max.bytes...
17500 просмотров

Kafka Avro Consumer с проблемами декодера
Когда я попытался запустить Kafka Consumer с Avro над данными с моей соответствующей схемой, он вернул ошибка «AvroRuntimeException: неверные данные. Длина отрицательная: -40». Я вижу, что у других были похожие проблемы преобразование массива...
9176 просмотров

Потребитель Kafka смещается за пределы диапазона без настроенной политики сброса для разделов
Я получаю исключение при запуске потребителя Kafka. org.apache.kafka.clients.consumer.OffsetOutOfRangeException: смещения вне диапазона без настроенной политики сброса для разделов{test-0=29898318} Я использую Kafka версии 9.0.0 с Java 7.
45801 просмотров
schedule 28.02.2023

невозможно установить max.poll.records для потребителя kafka, где cons.poll по-прежнему возвращает все записи в разделе
Я создал приложение многопоточного потребителя для работы с различными разделами. Изучая различные блоги, я узнал о свойстве max.poll.records, чтобы получить контроль над набором записей из данной темы, раздела (чтобы он мог быстро выйти из цикла...
15291 просмотров
schedule 30.01.2023