java.lang.RuntimeException: java.lang.ClassCastException: [B нельзя преобразовать в java.lang.String

Я сталкиваюсь с этой ошибкой при запуске топологии шторма в локальном режиме. У меня есть простая программа, которая проверяет, является ли число простым или нет. Я использую KafkaSpout в качестве источника и storm для его обработки. Kafka версия 2.10-0.8.2.1 версия storm 0.9.4 zookeeper 3.4.6

Ниже показан мой болт, который проверяет простое число

public class PrimeNumberBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;



    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        //System.out.println(tuple.getFields());
        //System.out.println(tuple.getString(0));
        String num = tuple.getString(0);
        //int number = tuple.getInteger( 0 );
        int number = Integer.parseInt(num);
        //System.out.println("IN Primenumber bolt = "+number);

        if( isPrime( number) )
        {  
            System.out.println( number );

        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   

    private boolean isPrime( int n ) 
    {
        if( n == 1)
        {
            return false;
        }
        if( n == 2 || n == 3 )
        {
            return true;
        }

        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }

        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

Ошибка:

 **18156 [Thread-11-prime] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    **at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4]
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]**
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
    ... 6 common frames omitted
**18158 [Thread-11-prime] ERROR backtype.storm.daemon.executor - 
java.lang.RuntimeException: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$fn__3439$fn__3451$fn__3498.invoke(executor.clj:748) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
**Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String**
    at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.9.4.jar:0.9.4]
    at com.geekcap.storm_test.PrimeNumberBolt.execute(PrimeNumberBolt.java:40) ~[classes/:na]
    at backtype.storm.daemon.executor$fn__3439$tuple_action_fn__3441.invoke(executor.clj:633) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_task_receiver$fn__3362.invoke(executor.clj:401) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.disruptor$clojure_handler$reify__1445.onEvent(disruptor.clj:58) ~[storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.4.jar:0.9.4]
    ... 6 common frames omitted
18375 [Thread-11-prime] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
    at backtype.storm.daemon.worker$fn__4693$fn__4694.invoke(worker.clj:491) [storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]

Пожалуйста, предложите мне изменения, которые мне нужно сделать в коде. Заранее спасибо!


person Ritesh Sinha    schedule 27.05.2015    source источник
comment
какой кортеж.getString(0); возвращается?   -  person    schedule 27.05.2015
comment
он возвращает строку. В моем случае в кластере kafka данные хранятся в виде строк (например, 232,12 и т. д.), поэтому я читаю и анализирую их в int.   -  person Ritesh Sinha    schedule 27.05.2015


Ответы (1)


Похоже, что ваш носик кафки читает данные в формате массива байтов.

Попробуйте использовать String Scheme, установив spoutconfig.scheme, как показано ниже.

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
person Adrian Seungjin Lee    schedule 27.05.2015
comment
Это мой носик кафки ________________________ private static KafkaSpout buildKafkaSentenceSpout() { String zkHostPort = localhost:2181; Тема строки = cust3; Строка zkRoot = / премьер; Строка zkSpoutId = основной носик; ZkHosts zkHosts = новый ZkHosts (zkHostPort); SpoutConfig.scheme = новая СхемаAsMultiScheme (новая StringScheme()); SpoutConfig spoutCfg = new SpoutConfig(zkHosts, тема, zkRoot, zkSpoutId); KafkaSpout kafkaSpout = новый KafkaSpout(spoutCfg); вернуть kafkaSpout; } - person Ritesh Sinha; 27.05.2015
comment
я добавил его. Но он выдает ошибку, так как не может сделать статическую ссылку на нестатическое поле SpoutConfig.scheme - person Ritesh Sinha; 27.05.2015
comment
В вашем случае, я думаю, вы можете наконец объявить, например spoutCfg.scheme=new SchemeAsMultiScheme(new StringScheme()); - person Adrian Seungjin Lee; 27.05.2015
comment
Рад, что помогло :) - person Adrian Seungjin Lee; 27.05.2015