Я новичок в Trident. Я пишу топологию трезубца, которая считывает данные из кафки. Название темы - «тест». У меня локальная настройка кафки. Я запустил zookeeper, кафку на местном уровне. И создал тему «тест» в кафке, открыл продюсер и набрал сообщение «Привет, Кафка!».
Я хочу прочитать сообщение «Hello Kafka» из темы «test» с помощью трезубца.
Ниже мой код. Я получаю пустой кортеж.
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
и это мой код класса TestFilter
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
Всякий раз, когда я набираю сообщение в kafka Producer в тему «test», распечатывается первый sysout, но он не проходит цикл if. Я просто получаю сообщение «TestFilter is called ...» не более того.
Я хочу получить фактические данные, которые я произвел для «тестовой» темы. Как?
config.forceFromStart=true
- person user2720864   schedule 12.09.2015