Объяснение функции Apache Storm Trident .each()

Я хочу использовать в проекте Apache Storm TridentTopology. Мне трудно понять функцию .each() из класса storm.trident.Stream. Ниже приведен пример кода, приведенный в их учебнике для справки:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

Я не понял сигнатуру метода .each(). Ниже то, что я понял. Пожалуйста, поправьте меня, если я ошибаюсь, а также дайте дополнительную информацию для моих знаний.

.каждый()

  • Первый параметр принимает поля, которые являются коррелированными ключами с исходящими значениями из spout и возвращаются из метода getOutputFields() в spout. Я до сих пор не знаю, для чего используется этот параметр.
  • Второй параметр — это класс, расширяющий BaseFunction. Он обрабатывает кортеж.
  • Понимание третьего параметра аналогично первому параметру.

person Sagar Gopale    schedule 26.11.2015    source источник


Ответы (1)


Первый параметр — это проекция на входные кортежи. В вашем примере только поле с именем «предложение» предоставляется Split. Если ваш источник выдает кортеж со схемой Fields("first", "sentence", "third"), вы можете получить доступ только к «предложению» в Split. Кроме того, "предложение" будет иметь нулевой индекс (а не один) в Split. Обратите внимание, что это не проекция на выходе -- все поля останутся в выходных кортежах! Это просто ограниченный просмотр всего кортежа в пределах Split.

Последний параметр — это схема Value, переданная emit() в Split. Имена этих полей добавляются как новый атрибут к выходным кортежам. Таким образом, схема выходного кортежа — это схема входного кортежа (исходная, не спроецированная первым параметром) плюс поля этого последнего параметра.

См. раздел Функция в документации: https://storm.apache.org/releases/0.10.0/Trident-API-Overview.html

person Matthias J. Sax    schedule 26.11.2015