org.apache.kafka.common.errors.TimeoutException

У меня есть кластер kafka с двумя брокерами 1.0.0, и я запускаю приложение 1.0.0 kafka stream API для этой kafka. Я увеличил запрос производителя request.timeout.ms до 5 минут, чтобы исправить TimeoutException производителя.

В настоящее время я получаю два типа исключений после некоторого времени работы. Я пытаюсь исправить эти исключения, как это предлагается в Apache Kafka: TimeoutException, и тогда ничего не работает ‏ Но здесь было неполное решение. Рекомендуется ли это решение (уменьшение размера партии производителя). Пожалуйста помоги. Исключение 1

2017-12-08 13:11:55,129 ERROR o.a.k.s.p.i.RecordCollectorImpl [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.; No more records will be sent and no more offsets will be recorded for this task.
2017-12-08 13:11:55,131 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] stream-thread [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] Failed to process stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000004, topic=Sample-Event, partition=0, offset=508417
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms..
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

Исключение 2

2017-12-11 11:08:35,257 ERROR o.a.k.s.p.i.RecordCollectorImpl [kafka-producer-network-thread | sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1-producer] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append; No more records will be sent and no more offsets will be recorded for this task.
2017-12-11 11:08:56,001 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] stream-thread [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] Failed to commit stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append

person Rednam Nagendra    schedule 11.12.2017    source источник
comment
Вы можете увеличить max.block.ms, чтобы увеличить время ожидания для первого исключения.   -  person Matthias J. Sax    schedule 11.12.2017
comment
См. также cwiki. apache.org/confluence/display/KAFKA/   -  person Frederic A.    schedule 12.12.2017


Ответы (4)


Мы столкнулись с аналогичной проблемой, которую мы решили. Первая проблема, установив: max.block.ms to something higher than currently configured value.

Второй выпуск: increasing the batch.size and decreasing the linger.ms (might increase latency) on Kafka Producer side. Increasing batch.size would send more batches with fewer messages in each batch.

person Maverick4U    schedule 15.01.2019

Это похоже на то, что часто происходит, когда ожидаемая тема не была создана. Попробуйте поискать дальше в файлах журналов.

Вы также можете явно использовать admin client, чтобы проверить, какие темы существуют.

person Stuart Mclean    schedule 15.03.2018

первая проблема связана с этой причиной: ( Производитель отправляет непрерывный пульс и будет ждать 60 000 мс (значение по умолчанию) для метаданных. Если метаданные отсутствуют в указанное время, он создает исключение тайм-аута потоков. Чтобы исправить это, добавьте Конфигурация производителя kafka (ProducerConfig.MAX_BLOCK_MS_CONFIG) на некоторую терку значений, которая составляет 60000 мс. Это решит проблему.

person Bala Sai    schedule 05.02.2020

если вы используете nifi и kafka с SASL_SSL без kerberos и предоставляете клиент kafka jaas, увеличьте время ожидания метаданных до 100 секунд и время ожидания подтверждения до 100 секунд, это сработает для вас.

person Ankur Saxena    schedule 30.04.2020