TimeoutException продюсера Kafka: истекает срок действия 1 записи (ов)

Я использую Kafka с Spring-boot:

Класс Kafka Producer:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Конфигурация Kafka:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

Допустим, я отправил 10 раз по 1000 сообщений в теме my-test-topic.

В 8 из 10 случаев я успешно получаю все сообщения от своего потребителя, но иногда получаю следующее сообщение: Ошибка:

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic

и org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time


person Prakash Pandey    schedule 09.10.2017    source источник
comment
Это ошибка, которую вы описываете, от производителя или потребителя?   -  person adarshr    schedule 09.10.2017
comment
Я получаю эту ошибку от производителя   -  person Prakash Pandey    schedule 09.10.2017
comment
Итак, ваша партия слишком медленная для такого низкого request.timeout.ms. Попробуйте сделать batch-size чуть ниже   -  person Artem Bilan    schedule 09.10.2017
comment
Разве 30 секунд недостаточно? (Я новичок в Кафке, пожалуйста, потерпите меня)   -  person Prakash Pandey    schedule 09.10.2017
comment
Я не знаю, но, согласно вашей ошибке, вы действительно превышаете эти 30 секунд: due to 30024 ms has passed   -  person Artem Bilan    schedule 09.10.2017
comment
какие-нибудь новости здесь? У меня такая же проблема с Spring Cloud Stream и kafka binder   -  person Roman T    schedule 24.02.2018


Ответы (2)


Есть 3 возможности:

  1. Увеличить request.timeout.ms - это время, в течение которого Kafka будет ждать, пока вся партия будет готова в буфере. Итак, в вашем случае, если в буфере меньше 100 000 сообщений, произойдет тайм-аут. Подробнее здесь: https://stackoverflow.com/a/34794261/2707179
  2. Уменьшить batch-size - по сравнению с предыдущим пунктом, пакеты будут отправляться чаще, но в них будет меньше сообщений.
  3. В зависимости от размера сообщения, может быть, ваша сеть не справляется с высокой нагрузкой? Проверьте, не является ли ваша пропускная способность узким местом.
person michalbrz    schedule 30.12.2017
comment
У меня та же проблема, что и у OP, с тех пор, как я включил SSL на Kafka, и заметил, что, как и я, он установил linger.ms. Согласно документации, пакеты отправляются по истечении этого времени ожидания, даже если пакет не заполнен, поэтому даже при большом размере пакета он не должен истекать по тайм-ауту. - person Jodiug; 04.06.2018
comment
@michalbrz После прочтения и понимания этих двух статей: 1) stackoverflow.com/a/34794261/4038460 и 2) cloudera.com/documentation/kafka/latest/topics/. Я чувствовал, что нам следует увеличить размер пакета, чтобы избежать тайм-аута. Если мы увеличим размер пакета - ›Количество пакетов будет уменьшено -› Число запросов уменьшится - ›Время, затрачиваемое на отправку записей, уменьшится -› Тайм-аут будет происходить не часто - person Hemanth; 02.05.2019

  1. Первый ключ к ошибке - 30024 ms has passed - конфигурация spring.kafka.producer.request.timeout.ms=30000 связана. Эти 30 секунд ожидания предназначены для заполнения буфера на стороне производителя.

  2. Когда сообщение публикуется, оно помещается в буфер на стороне производителя и будет ждать 30 секунд (см. Конфигурацию выше), чтобы заполниться. spring.kafka.producer.batch-size=100000 означает 100 КБ, поэтому, если нагрузка приема сообщений низкая, а буфер не заполняется сообщениями до 100 КБ за 30 секунд, вы ожидаете это сообщение.

  3. spring.kafka.producer.linger.ms=10 используется там, где нагрузка приема высока и производитель хочет ограничить send() вызовы брокеров Kafka. Это время, в течение которого производитель будет ждать перед отправкой сообщений брокеру после того, как пакет будет готов (то есть после того, как буфер будет заполнен до размера пакета 100 КБ).

Решение:

  • Увеличьте linger.ms, чтобы сообщения удерживались дольше после того, как пакет будет готов. Если для заполнения партии требуется больше времени, увеличьте request.timeout.ms.
  • Другой подход: уменьшите batch-size или увеличьте request.timeout.ms, или и то, и другое.
person hongsy    schedule 04.01.2021