Я новичок в Storm и Kafka, и через некоторое время мне удалось установить оба на локальную виртуальную машину. В настоящее время у меня есть рабочая топология wordCount, принимающая предложения из текстового файла dropBox:
public void nextTuple() {
final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt");
try {
//client.delete("/*.txt");
DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);
sentence= readFile("fromSpout.txt");
if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
Utils.sleep(1000);
return;
}
}
catch (DbxException ex) { }
catch (IOException ex) { }
//return 1;
finally {
outputStream.close();
}
}
catch (FileNotFoundException ex){}
catch (IOException ex) {}
if (sentence.length()<2) { Utils.sleep(10000); return; }
try { client.delete("/spout.txt");}
catch (DbxException ex) { }
_collector.emit(new Values(sentence));
Utils.sleep(1000);
Теперь я хочу обновить свой носик, чтобы использовать текст из Кафки, чтобы отправить мой следующий болт в топологии. Я безуспешно пытался следовать многим статьям и кодам в git. Например: этот носик кафки. Может ли кто-нибудь помочь и дать мне какое-то направление для реализации нового файла spout.java? Благодарю вас!