Код производителя Kafka FailedToSendMessageException

Создание сообщений с помощью Kafka

    import java.util.Date;
import java.util.Properties;

import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;





public class KafkaProducer {

    private static Producer<String, String> producer;
    public KafkaProducer()
    {
        Properties props = new Properties();
        props.put("metadata.broker.list","localhost:2181");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks","1");

        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer<String,String>(config);

    }

    public static void main(String[] args)
    {
        if(args.length<2)
        {
            System.err.println("Usage: KafkaProducer TopicName MessageCount");
            System.exit(0);
        }
        String topic = args[0];
        int messageCount = Integer.parseInt(args[1]);

        KafkaProducer kafka = new KafkaProducer();
        kafka.publishMessage(topic,messageCount);

    }

    private void publishMessage(String topic, int messageCount)
    {
        for(int mcount=0;mcount<messageCount;mcount++)
        {
            String runtime = new Date().toString();
            String msg = "Message Published Time -" + runtime;
            System.out.println(msg);

            KeyedMessage<String,String> data = new KeyedMessage<String,String>(topic,msg);
            producer.send(data);

        }
        producer.close();
    }

}

При запуске этой программы с использованием eclipse я получаю следующее исключение:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Message Published Time -Fri Jul 10 13:05:20 IST 2015
Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at KafkaProducer.publishMessage(KafkaProducer.java:52)
    at KafkaProducer.main(KafkaProducer.java:39)

Служба Zookeeper запущена, Брокер запущен и Тема создана. Потребитель тоже готов.

Может ли кто-нибудь помочь мне в этом вопросе?


person chandra sekhar lagadapati    schedule 10.07.2015    source источник


Ответы (2)


Вы можете попробовать использовать новый KafkaProducer, кратко описанный в документации kafka.

Обратите внимание на импорт org.apache.kafka.clients.producer.* вместо kafka.javaapi.producer.Producer, например:

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerTest {

    public static void main(String args[]) throws InterruptedException, ExecutionException {
        // set up Kafka producer
        KafkaProducer<String,String> kafkaProducer;
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // instantiate the producer
        kafkaProducer = new KafkaProducer<String,String>(props);

        // add data to kafka
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic", "test key", "test value");
        kafkaProducer.send(producerRecord);

        // close producer
        kafkaProducer.close();
    }
}
person Gloopy    schedule 10.07.2015

Не могли бы вы проверить, что ваш сервер Kafka работает на том же порту, который используется API производителя?

Обычно кластеры Kafka работают на порту 9092. Если это относится к вашей настройке, используйте тот же порт в конфигурации производителя. Ваш производитель использует порт 2181. Вероятно, это ошибка.

person Zeeshan Amber    schedule 11.07.2015