Привет, я использую Kafka API 0.10.0.1. Я создал тему картик и имеет 10 разделов. Из этого кода производителя я получаю вывод, поскольку все 10 сообщений проходят 10 разделов. Но проблема в том, что каждый раздел получает 10 сообщений. Я хочу, чтобы каждый раздел получал только одно сообщение, т. е. разделы должны получать 1 сообщение случайным образом. Или, если у нас есть 50 сообщений, то эти сообщения должны быть распределены по этим разделам случайным образом, а не каждый раздел получает 50 сообщений.
открытый класс Producerpar {
public static void main(String[] args) throws Exception{
String topic = "kartik";
if(topic == null){
System.out.println("there has to be some topic");
return;
}
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
String msg = "Hey there .. How are you doing ?";
for(int i = 0; i < 10; i++) {
List<PartitionInfo> infoList = producer.partitionsFor(topic);
for(PartitionInfo partitioninfo : infoList){
System.out.println(partitioninfo.partition());
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)));
System.out.println("Message: " + msg + " are sent to partitions: " +partitioninfo.partition() );
}
}
for(PartitionInfo partitioninfo : infoList){
- person Shankar   schedule 03.11.2016