Пустые данные при чтении данных из kafka с использованием топологии Trident

Я новичок в Trident. Я пишу топологию трезубца, которая считывает данные из кафки. Название темы - «тест». У меня локальная настройка кафки. Я запустил zookeeper, кафку на местном уровне. И создал тему «тест» в кафке, открыл продюсер и набрал сообщение «Привет, Кафка!».

Я хочу прочитать сообщение «Hello Kafka» из темы «test» с помощью трезубца.

Ниже мой код. Я получаю пустой кортеж.

    TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

и это мой код класса TestFilter

public TestFilter()
{
    //
}

@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

Всякий раз, когда я набираю сообщение в kafka Producer в тему «test», распечатывается первый sysout, но он не проходит цикл if. Я просто получаю сообщение «TestFilter is called ...» не более того.

Я хочу получить фактические данные, которые я произвел для «тестовой» темы. Как?


person Kutty    schedule 11.09.2015    source источник
comment
вы можете увидеть сообщение, используя сценарий console-consumer?   -  person user2720864    schedule 12.09.2015
comment
Да, я могу увидеть сообщение с помощью сценария консоли-потребителя.   -  person Kutty    schedule 12.09.2015
comment
Вы можете изменить config.forceFromStart=true   -  person user2720864    schedule 12.09.2015
comment
Я добавил новые поля (str). Началось работать. Спасибо.   -  person Kutty    schedule 13.09.2015
comment
@Kutty Вы должны были написать ответ на свой вопрос для потомков. Я столкнулся с той же проблемой, и было не сразу понятно, в чем был ее источник.   -  person nivox    schedule 24.10.2015


Ответы (1)


Проблема заключается в параметрах Stream.each. Соответствующая часть javadoc для метода:

each(Fields inputFields, Filter filter)

В документации это не совсем ясно, но семантика такова, что вы должны указать все поля, используемые вашим фильтром, с помощью параметра inputFields.

Затем Storm применит проекцию к входному кортежу и направит его в фильтр.

Учитывая, что вы не указали никаких полей ввода, проекция привела к пустому кортежу, что привело к сбою условия tuple.getValues().size()>0 внутри фильтра.

Стоит упомянуть и другие варианты каждого из них:

each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)

Они будут применять предоставленную функцию к проекции входного кортежа, добавляя результирующий кортеж к исходному входному кортежу, переименовывая новые поля как functionFields (т. Е. Используется только проекция для применения функции).

В частности, вторая версия эквивалентна вызову каждого с inputFields, установленным в null (или new Fields()), и приведет к передаче пустого кортежа в function.

person nivox    schedule 24.10.2015