время ожидания сеансов потребителя kafka истекло

У нас есть приложение, в котором потребитель читает сообщение, а поток выполняет ряд действий, включая доступ к базе данных до того, как сообщение будет отправлено в другую тему. Время между получением и созданием сообщения в цепочке может занять несколько минут. Как только сообщение создается для новой темы, выполняется фиксация, чтобы указать, что мы закончили работу с сообщением очереди потребителя. По этой причине автоматическая фиксация отключена.

Я использую потребителя высокого уровня, и я замечаю, что тайм-аут сеансов zookeeper и kafka, потому что он занимает слишком много времени, прежде чем мы что-либо делаем в очереди потребителя, поэтому kafka в конечном итоге перебалансирует каждый раз, когда поток возвращается, чтобы прочитать больше от потребителя очереди, и через некоторое время потребителю требуется много времени, прежде чем он прочитает новое сообщение.

Я могу установить очень большое время ожидания сеанса zookeeper, чтобы не создавать проблем, но затем мне нужно соответствующим образом настроить параметры перебалансировки, и kafka некоторое время не будет подбирать нового потребителя среди других побочных эффектов.

Какие у меня есть варианты решения этой проблемы? Есть ли способ поддержать кафку и смотрителя зоопарка, чтобы они были счастливы? Сохранятся ли у меня те же проблемы, если я буду использовать простой потребитель?


person Terry Cumaranatunge    schedule 20.12.2014    source источник
comment
У вас больше шансов получить помощь, если вы просто объясните свою проблему. Если вы скажете, что хотите получить обратную связь, ваш вопрос будет закрыт. Stack Overflow - это не форум; мы ценим краткость и конкретность.   -  person Jeffrey Bosboom    schedule 20.12.2014


Ответы (2)


Похоже, ваши проблемы сводятся к тому, чтобы полагаться на высокоуровневого потребителя для управления смещением последнего чтения. Использование простого потребителя решило бы эту проблему, поскольку вы контролируете постоянство этого смещения. Обратите внимание, что все, что делает высокоуровневый потребительский коммит, это сохраняет последнее смещение чтения в zookeeper. Никаких других действий не предпринимается, и сообщение, которое вы только что прочитали, все еще находится в разделе и доступно для чтения другим потребителям.

С простым потребителем kafka у вас есть гораздо больше контроля над тем, когда и как происходит это смещенное хранилище. Вы даже можете сохранить это смещение где-нибудь, кроме Zookeeper (например, в базе данных).

Плохая новость заключается в том, что, хотя сам по себе простой потребитель проще, чем потребитель высокого уровня, вам придется проделать гораздо больше работы с кодом, чтобы заставить его работать. Вам также придется написать код для доступа к нескольким разделам - то, что потребитель высокого уровня делает для вас очень хорошо.

person Chris Gerken    schedule 21.12.2014
comment
Привет, у меня такая же проблема, но в новой версии kafka простой потребитель обесценивается. Как это сделать сейчас? - person pg20; 07.11.2017

Я думаю, проблема в том, что метод опроса потребителя запускает запрос пульса потребителя. И когда вы увеличиваете session.timeout. Сердцебиение потребителя не дойдет до координатора. Из-за этого пропускания сердцебиения координатор помечает потребителя как мертвый. К тому же повторное присоединение потребителей происходит очень медленно, особенно в случае единственного потребителя.

Я столкнулся с аналогичной проблемой, и для ее решения мне нужно изменить следующий параметр в свойствах конфигурации потребителя.

session.timeout.ms = request.timeout.ms = больше, чем тайм-аут сеанса

Также вам необходимо добавить следующее свойство в server.properties на узле брокера kafka. group.max.session.timeout.ms =

Вы можете увидеть следующую ссылку для получения более подробной информации. http://grokbase.com/t/kafka/users/16324waa50/session-timeout-ms-limit

person Bharat Bhagat    schedule 11.08.2016