Kafka – Как получить сведения о неудачных сообщениях в классе Producer

Kafka позволяет асинхронно отправлять сообщения с помощью следующих методов класса Producer (KafkaProducer):

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record) 
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

Успехи можно обрабатывать с помощью
1) объекта Future<RecordMetaData> или
2) метода onCompletion, вызываемого обратным вызовом. Полная подпись метода и использование onCompletion приведены ниже (взято из документации kafka)

`

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);  
producer.send(record,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null)
                           e.printStackTrace();
                       System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               });

В то время как сбой необходимо обрабатывать с помощью Exception e, переданного в метод onCompletion

Хорошо, пока все выглядит хорошо.

Но если я правильно понимаю, любая разумная информация, которую можно получить из объекта exception или e, представляет собой трассировку стека и сообщение об исключении. Я хочу отметить, что e не содержит никакой информации о фактической отправленной записи. Или, другими словами, он не содержит ссылки на фактический record, который был отправлен брокеру kafka. Итак, какую полезную обработку или обработку может выполнить производитель, если record не был отправлен успешно. На самом деле не так много. Почему я говорю так: в идеале я хотел бы где-нибудь вести журнал ошибочных сообщений, а затем попытаться отправить их повторно. Но с небольшой информацией (e), предоставляемой фреймворком, я чувствую, что это невозможно.

Может кто-нибудь указать, прав я или нет?


person samshers    schedule 06.08.2017    source источник
comment
в качестве примечания я хочу отметить, что я не хочу сказать, что исключение e бесполезно. Это, безусловно, полная помощь для отладки и поиска причины ошибки. Но в чем я не уверен, так это в том, содержит ли он достаточно информации, чтобы определить, какой record дал сбой, чтобы я мог повторно отправить или зарегистрировать этот record   -  person samshers    schedule 06.08.2017
comment
Попробуйте следующую версию класса обратного вызова: класс CallbackFunc реализует обратный вызов { private int recordId; public CallbackFunc (int recordId) { this.recordId = recordId; } @Override public void onCompletion(метаданные RecordMetadata, Exception e) { if(e != null) { Log.error(Ошибка записи сообщения в Kafka. Идентификатор записи: + recordId); } } }   -  person Shahzad    schedule 06.08.2017


Ответы (1)


Вы можете легко создать обратный вызов, который получает producerRecord в качестве аргумента конструктора. Таким образом, после onCompletion с исключением вы можете получить полную информацию о записи производителя и даже попытаться отправить ее снова.

Я занимался той же проблемой. Создан обратный вызов, который получает как producerRecord, так и обработчик обратного вызова, который использует службу-исполнитель для повторной отправки записи. Так что, в конце концов, я могу терпеть любое количество сбоев (например, проблемы с сетью или kafka не работает) и восстанавливаться после них.

person Lior Chaga    schedule 20.02.2018