Преобразование топологии Storm — wordCount для использования Kafka Spout

Я новичок в Storm и Kafka, и через некоторое время мне удалось установить оба на локальную виртуальную машину. В настоящее время у меня есть рабочая топология wordCount, принимающая предложения из текстового файла dropBox:

public void nextTuple() {


 final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt"); 
try {
    //client.delete("/*.txt");
   DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);

   sentence= readFile("fromSpout.txt");          
   if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
           Utils.sleep(1000);
           return;
           }                    
        } 
catch (DbxException ex) {  } 
catch (IOException ex) { }       
        //return 1;
finally {
      outputStream.close();
         }
    }
catch (FileNotFoundException ex){}
catch (IOException ex) {}       


if (sentence.length()<2) {  Utils.sleep(10000);  return; }
try { client.delete("/spout.txt");}
 catch (DbxException ex) {  } 
_collector.emit(new Values(sentence)); 
Utils.sleep(1000);      

Теперь я хочу обновить свой носик, чтобы использовать текст из Кафки, чтобы отправить мой следующий болт в топологии. Я безуспешно пытался следовать многим статьям и кодам в git. Например: этот носик кафки. Может ли кто-нибудь помочь и дать мне какое-то направление для реализации нового файла spout.java? Благодарю вас!


person Zack S    schedule 11.07.2014    source источник


Ответы (1)


Начиная с версии storm 0.9.2, существует внешняя storm-kafka пакет, который может это сделать. На самом деле этот пакет предоставлен сообществу storm из storm-kafka-0.8-plus. . И есть тестовый проект, показывающий его использование.

В деталях сначала добавьте зависимость к вашему maven (или lein/gradle):

groupId: org.apache.storm
artifactId: storm-kafka
version: 0.9.2-incubating

Затем создайте топологию и носик следующим образом:

import storm.kafka

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
person halfelf    schedule 12.07.2014
comment
Большое спасибо за быстрый ответ. Когда я пытаюсь преобразовать топологию WordCount для использования OpaqueTridentKafkaSpout, это не работает. Должен ли я использовать топологию Trident для Kafka? или я могу реализовать Kafka IRichSpout? - person Zack S; 14.07.2014