Kafka: изменение количества разделов для определенной темы с помощью java

Я новичок в Kafka и работаю с новыми KafkaProducer и KafkaConsumer, версия: 0.9.0.1

Есть ли способ в java изменить / обновить количество разделов для определенной темы после ее создания.

Я не использую zookeeper для создания темы. Мой KafkaProducer автоматически создает темы при поступлении запроса на публикацию.

Я также могу предоставить более подробную информацию, если этого недостаточно


person nikhil7610    schedule 25.05.2016    source источник


Ответы (1)


Да, это возможно. Вы должны получить доступ к AdminUtils классу scala в kafka_2.11-0.9.0.1.jar, чтобы добавить разделы.

AdminUtils поддерживает количество разделов в теме можно только увеличить. Вам могут понадобиться jar-файлы kafka_2.11-0.9.0.1.jar, zk-client-0.8.jar, scala-library-2.11.8.jar и scala-parser-combinators_2.11-1.0.4.jar в пути к классам.

Части приведенного ниже кода заимствованы / вдохновлены примерами kafka-cloudera.

package org.apache.kafka.examples;

import java.io.Closeable;

import org.I0Itec.zkclient.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import kafka.admin.AdminOperationException;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Enforced$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class Test {

    static final Logger logger = LogManager.getLogger();

    public Test() {
        // TODO Auto-generated constructor stub
    }

    public static void addPartitions(String zkServers, String topic, int partitions) {

        try (AutoZkClient zkClient = new AutoZkClient(zkServers)) {
            ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

            if (AdminUtils.topicExists(zkUtils, topic)) {
                logger.info("Altering topic {}", topic); 
                try {
                    AdminUtils.addPartitions(zkUtils, topic, partitions, "", true, Enforced$.MODULE$);
                    logger.info("Topic {} altered with partitions : {}", topic, partitions); 
                } catch (AdminOperationException aoe) {
                    logger.info("Error while altering partitions for topic : {}", topic, aoe); 
                } 
            } else {
                logger.info("Topic {} doesn't exists", topic); 
            } 
        } 
    }

    // Just exists for Closeable convenience 
    private static final class AutoZkClient extends ZkClient implements Closeable { 

        static int sessionTimeout = 30_000;
        static int connectionTimeout = 6_000;

        AutoZkClient(String zkServers) {
            super(zkServers, sessionTimeout, connectionTimeout, ZKStringSerializer$.MODULE$);
        }
    }

    public static void main(String[] args) {

        addPartitions("localhost:2181", "hello", 20);
    }
}
person Kamal Chandraprakash    schedule 27.05.2016
comment
Спасибо.Это то, что я искал - person nikhil7610; 27.05.2016