Kafka Consumer не получает сообщения

Я новичок в Кафке. Я прочитал в Интернете множество инструкций по созданию Kafka Producer и Kafka Consumer. Я успешно сделал первый, который может отправлять сообщения в кластер Kafka. Однако последний я не закончил. Пожалуйста, помогите мне решить эту проблему. Я видел, что моей проблеме нравятся некоторые сообщения на StackOverflow, но я хочу описать более четко. Я запускаю Kafka и Zookeeper на сервере Ubuntu в Virtual Box. Используйте простейшую конфигурацию (почти по умолчанию) с 1 кластером Kafka и 1 кластером Zookeeper.

1.Когда я использую командную строку Kafka для производителя и потребителя, например:

* Case 1: It works. I can see the word: Hello, World on the screen

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null.
$~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning.

2. Когда я использую Продюсер и командную строку Kafka для потребителя, например:

* Case 2: It works. I can see the messages which sent from the Producer on the screen

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic TutorialTopic --from-beginning.
$java -cp target/Kafka_Producer_Program-0.0.1-SNAPSHOT.jar AddLab_Producer

3. Когда я использую "Производитель" и "Потребитель", например:

* Case 3: Only Producer works perfectly. The Consumer runs but does not shows any messages. 

$~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic helloKafka --partitions 1 --replication-factor 1.
$java -cp target/Kafka_Producer_Program-0.0.1-SNAPSHOT.jar AddLab_Producer
$java -cp target/Kafka_Consumer_Program-0.0.1-SNAPSHOT.jar AddLab_Consumer

Это мой код Производителя и Потребителя. Собственно, я скопировал их с некоторых инструкций сайта Kafka.

* Продюсерская программа

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class AddLab_Producer {
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        boolean sync = false;
        String topic = args[0];
        String key = "mykey";

        for (int i = 1; i <= 3; i++) {
            String value = args[1] + " " + i;
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, value);
            if (sync) {
                producer.send(producerRecord).get();
            } else {
                producer.send(producerRecord);
            }
        }
        producer.close();
    }
}

* Потребительская программа

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AddLab_Consumer {

    public static class KafkaPartitionConsumer implements Runnable {

        private int tnum ;
        private KafkaStream kfs ;

        public KafkaPartitionConsumer(int id, KafkaStream ks) {
            tnum = id ;
            kfs = ks ;
        }   
        public void run() {
            // TODO Auto-generated method stub
            System.out.println("This is thread " + tnum) ;

            ConsumerIterator<byte[], byte[]> it = kfs.iterator();
                int i = 1 ;
                while (it.hasNext()) {
                    System.out.println(tnum + " " + i + ": " + new String(it.next().message()));
                    ++i ;
                }       
        }
    }

    public static class MultiKafka {    
        public void run() {
        }   
    }

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "mygroupid2");
        props.put("zookeeper.session.timeout.ms", "413");
        props.put("zookeeper.sync.time.ms", "203");
        props.put("auto.commit.interval.ms", "1000");

        ConsumerConfig cf = new ConsumerConfig(props) ;    
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(cf) ;      
        String topic = "mytopic" ;     
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        ExecutorService executor = Executors.newFixedThreadPool(1); ;

        int threadnum = 0 ;     
        for(KafkaStream<byte[],byte[]> stream  : streams) { 
            executor.execute(new KafkaPartitionConsumer(threadnum,stream));
            ++threadnum ;
        }
    }
}

* Мой файл POM.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>HelloJava</groupId>
    <artifactId>HelloJava</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.9.0.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Я очень признателен за вашу помощь. Большое Вам спасибо.

Экран потребителя. Кажется, он запускается, но не может получать сообщения от производителя


person nothingx0x    schedule 22.12.2015    source источник


Ответы (1)


Я столкнулся с той же проблемой, что и вы. После долгих попыток вот ответ.

Вы можете выбрать один из двух типов API нового потребителя kafka.

cousumer.assign (...)

consumer.subscribe (..)

И используйте как:

    // set these properites or you should run consumer first than run producer
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    boolean assign = false;
    if(assign) {
        TopicPartition tp = new TopicPartition(topic, 0);
        List<TopicPartition> tps = Arrays.asList(tp);
        consumer.assign(tps);
        consumer.seekToBeginning(tps);
    }else {
        consumer.subscribe(Arrays.asList(topic));
    }

http://kafka.apache.org/documentation.html#newconsumerconfigs

Если вы используете старый пользовательский API, то это почти то же самое и с конфигурацией свойств. Не забудьте добавить два следующих кода, если вы хотите видеть сообщения, созданные до того, как потребитель потребляет:

props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");

Надеюсь, это поможет другим людям.

person Aylwyn Lake    schedule 18.08.2016
comment
почему установка этих двух полей гарантирует, что KafkaConsumer будет получать сообщения? их имя, похоже, не указывает на то, что они связаны со способностью KafkaConsumer получать сообщения ... - person Kevin Zhao; 09.08.2017
comment
Установка auto.offset.reset на earliest делает это за меня - person TLJ; 30.11.2017