Задержка в сообщениях, потребляющих потребителя в Apache Kafka

Я использую Kafka 0.8.0 и пытаюсь реализовать описанный ниже сценарий.

JCA API (действует как производитель и отправляет данные) -----> Потребителю ------> HBase

Я отправляю каждое сообщение потребителю, как только получаю данные с помощью JCA Client. Например, как только производитель отправляет сообщение № 1, я хочу получить то же самое от потребителя и «поместить» в HBase. Но мой потребитель начинает получать сообщения после нескольких случайных n сообщений. Я хочу синхронизировать производителя и потребителя, чтобы они оба начали работать вместе.

Я использовал:

1 брокер

1 отдельная тема

1 единственный производитель и потребитель высокого уровня

Может ли кто-нибудь подсказать, что мне нужно сделать, чтобы добиться того же?

ИЗМЕНИТЬ:

Добавление соответствующего фрагмента кода.

Consumer.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

продюсер.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

Вот как потребитель ведет себя для первых 10 сообщений: он не анализирует это сообщение, полученное потребителем, но начиная с 11-го сообщения оно начинает работать правильно.

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

EDITED: добавление функции прослушивателя, в которой производитель отправляет сообщения потребителю. И я использую конфигурацию производителя по умолчанию, не перезаписывая ее

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

person Ankita    schedule 12.02.2014    source источник
comment
Можете ли вы показать код / ​​конфигурацию производителя и потребителя? Похоже, некоторые из них работают с пакетами (что, собственно, и хорошо).   -  person Dmitry    schedule 12.02.2014
comment
@Dmitry добавил фрагмент кода.   -  person Ankita    schedule 12.02.2014
comment
С потребителем все в порядке (кроме свойства fetch.size = 1K - это означает, что потребитель не может получать сообщения большего размера, но, вероятно, это не та проблема, которую мы ищем). Можете ли вы поделиться кодом методов newProducerConfig () и run () для производителя?   -  person Dmitry    schedule 13.02.2014
comment
@Dmitry У моего класса производителя нет потоков. У меня есть одна функция прослушивателя, которая вызывается всякий раз, когда происходит изменение определенного значения, и внутри нее я использую функцию продюсера.send () для отправки сообщений потребителю. В любом случае я поделюсь функцией прослушивателя.   -  person Ankita    schedule 13.02.2014


Ответы (2)


  1. Попробуйте добавить props.put("request.required.acks", "1") в конфигурацию производителя. По умолчанию производитель не ждет подтверждения, и доставка сообщения не гарантируется. Итак, если вы запустите брокера непосредственно перед тестом, производитель может начать отправлять сообщения до того, как брокер будет полностью инициализирован, и первые несколько сообщений могут быть потеряны.

  2. Попробуйте добавить props.put("auto.offset.reset", "smallest") в конфигурацию потребителя. Это равно --from-beginning опции kafka-console-consumer.sh. Если ваш потребитель запускается позже, чем производитель, и в Zookeeper нет сохраненных данных смещения, то по умолчанию он начнет использовать только новые сообщения (см. Потребительские конфигурации в документации).

person Dmitry    schedule 13.02.2014
comment
Спасибо за предложение. Добавлен props.put (request.required.acks, 1) в продюсер, но программа ведет себя случайным образом. Я запускал программу 5 раз каждый раз с новой темой. Но все 5 раз он давал разные результаты. Дважды производитель и потребитель были синхронизированы, в остальное время задержка потребителя. - person Ankita; 13.02.2014
comment
Под "отложенными" вы имеете в виду, что все сообщения были получены, но не сразу после отправки? В исходном выводе первые несколько сообщений были полностью утеряны. - person Dmitry; 13.02.2014
comment
Да, на самом деле есть два сценария: 1) Иногда все сообщения были получены, но не сразу после отправки. 2) В других случаях несколько сообщений были потеряны, как показано в предоставленных выходных данных. Но когда я запускаю эту команду bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --topic topicname --from-begin с консоли, я получаю такое же количество сообщений в потребителе, как и созданное производителем. Почему это происходит ?? - person Ankita; 13.02.2014
comment
К ответу добавлено еще одно предложение. - person Dmitry; 14.02.2014
comment
Да, @Dmitry, похоже, работает. Но использование свойства props.put (auto.offset.reset, smallest) означало бы, что потребитель не будет отслеживать смещения при извлечении данных из той же темы позже, после того как мы создадим для него дополнительные данные? Не получим ли мы дублирующиеся данные, которые мы уже прочитали от потребителя? - person Ankita; 17.02.2014
comment
Нет, эта опция используется только тогда, когда в Zookeeper нет начального смещения. - person Dmitry; 17.02.2014
comment
Я собрал больше данных по той же теме и попытался увидеть, как ведет себя потребитель. Мои наблюдения: 1. Когда я начал заново с новой темы, потери данных не было. 2. Получил больше данных по той же теме и увидел, что потребитель получает несколько случайных (5 или 6) сообщений, которые он уже прочитал при первом запуске. Таким образом, когда потребитель читает данные из одной и той же темы, возникает небольшое дублирование. Итак, IMO, похоже, проблема со смещением. Можно ли это исправить? - person Ankita; 19.02.2014
comment
Попытайтесь вызвать consumer.shutdown () перед выходом. Он будет синхронизировать потребляемые смещения. - person Dmitry; 19.02.2014
comment
Привет, @Dmitry, моя проблема решена после этих ответов. У меня тоже есть один вопрос. Возможно ли иметь пул потоков фиксированного размера (скажем, 5) для обработки сообщений из любого числа (скажем, 100 или более) тем? - person Samra; 13.02.2015
comment
Для последних версий kafka 12.1 0.10.1.0 необходимо изменить конфигурации на: ProducerConfig.ACKS_CONFIG, 1 Consumer: ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest - person Lina; 08.12.2016

Один из возможных вариантов - Kafka Lag. Возможно, потребитель перегружен слишком большим количеством разделов. Или обработка одного сообщения очень затратна.

person Gaurav Khare    schedule 15.09.2015