Kafka позволяет асинхронно отправлять сообщения с помощью следующих методов класса Producer (KafkaProducer):
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
Успехи можно обрабатывать с помощью
1) объекта Future<RecordMetaData>
или
2) метода onCompletion
, вызываемого обратным вызовом. Полная подпись метода и использование onCompletion
приведены ниже (взято из документации kafka)
`
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
В то время как сбой необходимо обрабатывать с помощью Exception e
, переданного в метод onCompletion
Хорошо, пока все выглядит хорошо.
Но если я правильно понимаю, любая разумная информация, которую можно получить из объекта exception
или e
, представляет собой трассировку стека и сообщение об исключении. Я хочу отметить, что e
не содержит никакой информации о фактической отправленной записи. Или, другими словами, он не содержит ссылки на фактический record
, который был отправлен брокеру kafka. Итак, какую полезную обработку или обработку может выполнить производитель, если record
не был отправлен успешно. На самом деле не так много. Почему я говорю так: в идеале я хотел бы где-нибудь вести журнал ошибочных сообщений, а затем попытаться отправить их повторно. Но с небольшой информацией (e
), предоставляемой фреймворком, я чувствую, что это невозможно.
Может кто-нибудь указать, прав я или нет?
e
бесполезно. Это, безусловно, полная помощь для отладки и поиска причины ошибки. Но в чем я не уверен, так это в том, содержит ли он достаточно информации, чтобы определить, какойrecord
дал сбой, чтобы я мог повторно отправить или зарегистрировать этотrecord
- person samshers   schedule 06.08.2017