Итак, я пытаюсь использовать Kafka для своего приложения, в котором производитель регистрирует действия в Kafka MQ, а потребитель считывает их с MQ. Поскольку мое приложение находится в Go, я использую Shopify Sarama, чтобы сделать это возможным.
Прямо сейчас я могу прочитать MQ и распечатать содержимое сообщения, используя
fmt.Printf
Тем не менее, мне бы очень хотелось, чтобы обработка ошибок была лучше, чем консольная печать, и я готов сделать все возможное.
Код прямо сейчас для потребительского подключения:
mqCfg := sarama.NewConfig()
master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
panic(err) // Don't want to panic when error occurs, instead handle it
}
И обработка сообщений:
go func() {
defer wg.Done()
for message := range consumer.Messages() {
var msgContent Message
_ = json.Unmarshal(message.Value, &msgContent)
fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
}
}()
Мои вопросы (я новичок в тестировании Кафки и новичок в кафке в целом):
Где могут возникнуть ошибки в вышеуказанной программе, чтобы я мог их обработать? Для начала мне подойдет любой пример кода. Условия ошибки, о которых я мог подумать, - это когда msgContent действительно не содержит полей типа ContentId в JSON.
В kafka бывают ситуации, когда потребитель пытается читать по текущему смещению, но по каким-то причинам не смог (даже при правильно сформированном JSON)? Может ли мой потребитель вернуться, чтобы сказать, что x шагов выше ошибочного чтения смещения и повторной обработки смещений? Или есть лучший способ сделать это? опять же, что это могут быть за ситуации?
Я открыт для чтения и пробовать разные вещи.