Нужно печатать сообщения об отправке на 10 разделов случайным образом, используя kafka

Привет, я использую Kafka API 0.10.0.1. Я создал тему картик и имеет 10 разделов. Из этого кода производителя я получаю вывод, поскольку все 10 сообщений проходят 10 разделов. Но проблема в том, что каждый раздел получает 10 сообщений. Я хочу, чтобы каждый раздел получал только одно сообщение, т. е. разделы должны получать 1 сообщение случайным образом. Или, если у нас есть 50 сообщений, то эти сообщения должны быть распределены по этим разделам случайным образом, а не каждый раздел получает 50 сообщений.

открытый класс Producerpar {

public static void main(String[] args) throws Exception{

      String topic = "kartik";
      if(topic == null){
         System.out.println("there has to be some topic");
        return;
      }

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");



 Producer<String, String> producer = new KafkaProducer<String, String>(props); 

String msg = "Hey there .. How are you doing ?";    
for(int i = 0; i < 10; i++) {

    List<PartitionInfo> infoList = producer.partitionsFor(topic);
    for(PartitionInfo partitioninfo : infoList){ 
    System.out.println(partitioninfo.partition());


    producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)));


 System.out.println("Message:  " + msg +  " are sent to partitions: " +partitioninfo.partition()  );


 }

}


person Kartikeya Gupta    schedule 02.11.2016    source источник
comment
Вы забыли конечную фигурную скобку для внутреннего цикла for, и в этом случае производитель отправляет всего 100 сообщений?   -  person amethystic    schedule 03.11.2016
comment
@Kartikeya Gupta: потому что вы повторяете каждое сообщение (от 1 до 10) для всех разделов, удалите это для каждого for(PartitionInfo partitioninfo : infoList){   -  person Shankar    schedule 03.11.2016
comment
@Shankar: Если я удалю это, в консоли будет напечатано сообщение, а не идентификатор раздела, в который отправляются сообщения. я хочу, чтобы он также отображал в консоли, к какому идентификатору раздела он идет.   -  person Kartikeya Gupta    schedule 03.11.2016
comment
@KartikeyaGupta: Вы пробовали мой ответ с обратным вызовом?   -  person Shankar    schedule 04.11.2016


Ответы (1)


Вы можете использовать callback при отправке сообщения, которое вернет вам RecordMetaData, из которого вы можете получить идентификатор раздела сообщения.

Например:

Producer<String, String> producer = new KafkaProducer<String, String>(props); 

    for(int i = 0; i < 10; i++) {

        ProducerRecord record = new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i))
            producer.send(record,
                           new Callback() {
                               public void onCompletion(RecordMetadata metadata, Exception e) {
                                   int partitionId = metadata.partition();
                               }
                           });

     }
person Shankar    schedule 04.11.2016