Обработка ошибок Kafka с помощью Shopify Sarama

Итак, я пытаюсь использовать Kafka для своего приложения, в котором производитель регистрирует действия в Kafka MQ, а потребитель считывает их с MQ. Поскольку мое приложение находится в Go, я использую Shopify Sarama, чтобы сделать это возможным.

Прямо сейчас я могу прочитать MQ и распечатать содержимое сообщения, используя

fmt.Printf

Тем не менее, мне бы очень хотелось, чтобы обработка ошибок была лучше, чем консольная печать, и я готов сделать все возможное.

Код прямо сейчас для потребительского подключения:

mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // Don't want to panic when error occurs, instead handle it
}

И обработка сообщений:

    go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
    }
}()

Мои вопросы (я новичок в тестировании Кафки и новичок в кафке в целом):

  1. Где могут возникнуть ошибки в вышеуказанной программе, чтобы я мог их обработать? Для начала мне подойдет любой пример кода. Условия ошибки, о которых я мог подумать, - это когда msgContent действительно не содержит полей типа ContentId в JSON.

  2. В kafka бывают ситуации, когда потребитель пытается читать по текущему смещению, но по каким-то причинам не смог (даже при правильно сформированном JSON)? Может ли мой потребитель вернуться, чтобы сказать, что x шагов выше ошибочного чтения смещения и повторной обработки смещений? Или есть лучший способ сделать это? опять же, что это могут быть за ситуации?

Я открыт для чтения и пробовать разные вещи.


person premunk    schedule 01.04.2015    source источник
comment
json.Unmarshal может вызвать ошибку, и если вы не хотите вызывать панику... Просто не надо :)   -  person MIkCode    schedule 02.04.2015
comment
Ха. Спасибо. Любая идея о том, как я мог бы сделать # 2?   -  person premunk    schedule 02.04.2015


Ответы (1)


Относительно 1) Проверьте, где я регистрирую сообщения об ошибках ниже. Это более или менее то, что я бы сделал.

По поводу 2) не знаю насчет попытки отступить в теме. Это очень возможно, просто создавая потребителя снова и снова, с его начальным смещением минус один каждый раз. Но я бы не советовал, так как, скорее всего, вы будете повторять одно и то же сообщение снова и снова. Я советую часто сохранять смещение, чтобы вы могли восстановиться, если дела пойдут плохо.

Ниже приведен код, который, как мне кажется, отвечает на большинство ваших вопросов. Я не пробовал компилировать это. А API-интерфейс sarama в последнее время меняется, поэтому в настоящее время API может немного отличаться.

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {
    wg.Add(1)
    go func(){
        defer wg.Done()
        //to track the last known good offset we processed, which is 
        // updated after each successfully processed event. 
        saveprogress := func(off int64){
            //Save the offset somewhere...a file... 
            //Ive also used kafka to store progress 
            //using a special topic as a WAL
        }
        defer saveprogress(lastgoodoffset)

        client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())
        if err != nil {
            log.Error(err)
            return
        }
        defer client.Close()
        sarama.NewConsumerConfig()
        consumerConfig.OffsetMethod = sarama.OffsetMethodManual
        consumerConfig.OffsetValue = int64(lastgoodoff)
        consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)
        if err != nil {
            log.Error(err)
            return
        }
        defer consumer.Close()
        for {
            select {
            case event := <-consumer.Events():
                if event.Err != nil {
                    log.Error(event.Err)
                    return
                }
                msgContent := &Message{}
                err = json.Unmarshal(message.Value, msgContent)
                if err != nil {
                    log.Error(err)
                    continue //continue to skip this message or return to stop without updating the offset.
                }
                // Send the message on to be processed.
                out <- msgContent 

                lastgoodoff = event.Offset
            }
        }
    }()
}
person eSniff    schedule 02.04.2015