Понимание Kafka Consumer API для Java

Я хочу понять API приема Kafka. Я включил пример кода, который работает.

  1. Почему в Kafka ConsumerStreamMap.get(topic) для одной темы есть список получателей KafkaStream‹>?
  2. Текущий процесс, кажется, перебирает KafkaStream‹> List, а затем перебирает сообщения. Но KafkaReceiver должен работать вечно, поэтому я ожидаю, что внутреннее время зациклится навсегда. Это делает List> излишним.
  3. В нескольких примерах также используется ConsumerStreamMap.get(topic).get(0). Так правильно ли тогда писать продюсера?

        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // Define single thread for topic
        topicMap.put(topicName, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
        List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
    
        for (final KafkaStream<byte[], byte[]> stream : streamList) 
        {
           ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
           while (consumerIte.hasNext()) 
           {
              counter++;
              String message = new String(consumerIte.next().message());
              String id = topic.hashCode() + "-" + date.getTime() + "-" + counter;
              System.out.println(message);
            }
          }
    

person tsar2512    schedule 07.12.2015    source источник


Ответы (1)


Вы можете найти ответы в вики kafka: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

  1. ConsumerStreamMap — это карта пар (тема, список KafkaStream). Количество потоков зависит от следующей строки в вашем коде:

    topicMap.put(topicName, numberOfStreams);
    

если вы предоставите больше тем, чем есть разделов в теме, некоторые темы никогда не увидят сообщение. если у вас больше разделов, чем потоков, некоторые потоки будут получать данные из нескольких разделов. если у вас есть несколько разделов на поток, НЕТ гарантии порядка получения сообщений, за исключением того, что внутри раздела смещения будут последовательными. Например, вы можете получить 5 сообщений из раздела 10 и 6 из раздела 11, затем еще 5 сообщений из раздела 10, а затем еще 5 сообщений из раздела 10, даже если в разделе 11 есть доступные данные. добавление большего количества процессов/потоков приведет к повторной балансировке Kafka, возможно, изменив назначение раздела потоку.

  1. вам нужно повторять каждый поток в своем собственном потоке.

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
    
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    
    public class ConsumerTest implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
    
        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }
    
        public void run() {
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + m_threadNumber + ": " + new      String(it.next().message()));
            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }
    
  2. ConsumerStreamMap.get(topic).get(0) правильно, только если у вас есть 1 тема и 1 поток

person Anatoly Deyneka    schedule 07.12.2015
comment
Спасибо за подробный ответ :) Просто пояснение к пункту 3. В основном вы имели в виду, что если у меня есть 1 поток на тему, я все равно могу иметь несколько тем и только один поток на тему. Я пытаюсь реализовать механизм блокировки для нескольких тем, где я получаю информацию из двух тем одну за другой. Так что просто подтверждаю это. - person tsar2512; 07.12.2015