Пакет Karaf - Kafka OSGI - Проблема производителя

Я пытаюсь создать простой пакет для производителя Kafka в версии 4.0.3 apache Karaf.

Вот мой код Java

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
Producer<String, String> producer = new KafkaProducer<String,String>(props,new StringSerializer(),new StringSerializer());
//for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("test","data", outputData));

producer.close();

Я четко объявил соответствующую зависимость в pom.xml

<dependency>
        <groupId>org.apache.servicemix.bundles</groupId>
        <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
        <version>0.9.0.0_1</version>
</dependency>

Я также развернул этот клиентский пакет kafka.

но при запуске производителя я вижу ниже исключение при первой попытке.

Exception in thread "pool-135-thread-1" java.lang.ExceptionInInitializerError
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
    .
    .
    .
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.clients.producer.internals.DefaultPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.DefaultPartitioner could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:78)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:94)
    at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:206)
    ... 12 more

А потом последовательно этот...

Exception in thread "pool-136-thread-1" java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
.
.
.
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745

Кто-нибудь сталкивался с подобной проблемой с комплектом??


person user3619698    schedule 12.01.2016    source источник
comment
Среда выполнения OSGi не заботится о ваших зависимостях Maven. Он заботится только о МАНИФЕСТЕ внутри вашего пакета. Если вы загружаете классы по имени, как вы это делаете, вам нужно добавить конфигурацию времени сборки, чтобы соответствующие пакеты добавлялись в список импортируемых пакетов. Или вы добавляете DynamicImport-Package: * в свой МАНИФЕСТ. Как именно вы это сделаете, зависит от того, как вы строите свой пакет. С maven-bundle-plugin?   -  person Ralf    schedule 12.01.2016
comment
Да, у меня такая же проблема. Самое смешное, что если я вставляю банку kafka (либо сервисную, либо оригинальную kafka), когда я создаю экземпляр KafkaProducer, я сразу же получаю эту ошибку, хотя я могу ссылаться на KafkaProducer просто отлично. Либо что-то странное искажает сообщение об ошибке, либо происходит какая-то темная загрузка классов...   -  person Frank Lee    schedule 26.02.2016


Ответы (3)


Я видел эту же проблему в 0.9.0. Оказалось, что был установлен загрузчик контекста Thread, и в этом случае Kafka использует этот загрузчик классов для разрешения. Таким образом, загрузчик классов контекста потока должен быть либо:

  • Загрузчик классов, который может решить все, что связано с Kafka.
  • null

Не знаю, заденет ли это меня, но добавлю:

Thread.currentThread().setContextClassLoader(null);

сделал трюк.

person Frank Lee    schedule 26.02.2016
comment
Ваше решение соответствует обходному пути, предложенному в JIRA: issues.apache.org/jira/browse/ КАФКА-3218 - person Kyr; 04.03.2016
comment
Когда этот запрос на вытягивание будет объединен, должно быть доступно другое решение: github.com/apache/kafka/pull /1421 - person lightswitch05; 26.07.2016

используя версию клиента Kafka 0.8.2.2_1, проблема решена.

person user3619698    schedule 13.01.2016

Может быть полезно для других, обходной путь, предложенный в JIRA

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
person ravthiru    schedule 28.03.2017