Как Storm справляется с nextTuple in the Bolt

Я новичок в Storm и создал программу для чтения увеличенных чисел за определенное время. Я использовал счетчик в Spout, а в методе "nextTuple()" счетчик генерируется и увеличивается.

_collector.emit(new Values(new Integer(currentNumber++))); 
/* how this method is being continuously called...*/

а в методе execute() класса Tuple есть

public void execute(Tuple input) {
int number = input.getInteger(0);
logger.info("This number is (" + number + ")");
_outputCollector.ack(input);
}
/*this part I am clear as Bolt would receive the input from Spout*/

В моем исполнении основного класса у меня есть следующий код

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("NumberSpout", new NumberSpout());
builder.setBolt("NumberBolt", new PrimeNumberBolt())
            .shuffleGrouping("NumberSpout");
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("NumberTest", config, builder.createTopology());
Utils.sleep(10000);
localCluster.killTopology("NumberTest");
localCluster.shutdown();

В программах отлично работает. В настоящее время я ищу здесь, как структура Storm внутренне вызывает метод nextTuple() непрерывно. Я уверен, что в моем понимании чего-то здесь не хватает, и из-за этого пробела я не могу подключиться к внутренней логике этого фреймворка.

Может ли кто-нибудь из вас, ребята, помочь мне четко понять эту часть, тогда это будет для меня большим подспорьем, так как мне придется реализовать эту концепцию в моем проекте. Если я буду концептуально ясен здесь, я смогу добиться значительного прогресса. Благодарю, если кто-нибудь может быстро помочь мне здесь. Ждем ответов...


person JavaPassion    schedule 05.12.2013    source источник
comment
В журнале я получаю вывод как 2013-12-05 14:47:10,872 [Thread-20] INFO my.storm.NumberBolt - Текущий номер здесь: 3973785   -  person JavaPassion    schedule 05.12.2013


Ответы (2)


как инфраструктура Storm постоянно вызывает метод nextTuple().

Я считаю, что на самом деле это включает в себя очень подробное обсуждение всего жизненного цикла топологии шторма, а также четкие концепции различных сущностей, таких как рабочие, исполнители, задачи и т. д. Фактическая обработка топологии выполняется классом StormSubmitter с его submitTopology метод.

Самое первое, что он делает, это начинает загружать банку с помощью интерфейса Nimbus Thrift и затем вызывает submitTopology, которая в итоге отправляет топологию в Nimbus.

Затем Nimbus начинает с нормализации топологии (из документа: Основная цель нормализации — убедиться, что каждая отдельная задача будет иметь одинаковые регистрации сериализации, что очень важно для правильной работы сериализации. ), за которыми следует сериализация, рукопожатие Zookeeper , запуск процессов supervisor и worker и так далее. Это слишком широко для обсуждения, но если вы действительно хотите узнать больше, вы можете пройти через
жизненный цикл storm. топология, где хорошо объясняются пошаговые действия, выполняемые в течение всего времени.
( небольшое примечание из документации)

Сначала пара важных замечаний о топологиях:

Фактическая топология, которая работает, отличается от топологии, указанной пользователем. Фактическая топология имеет неявные потоки и неявный болт «acker», добавленный для управления структурой подтверждения (используется для гарантии обработки данных).

Неявная топология создается через системную топологию! функция. системная топология! используется в двух местах:
- - когда Nimbus создает задачи для кода топологии
- - в рабочем потоке, чтобы он знал, куда ему нужно направить сообщения в код

Теперь вот несколько подсказок, которыми я мог бы поделиться...
Носики или болты на самом деле являются компонентами, которые выполняют реальную обработку (логику). В терминологии штормов они выполняют столько же задач по всей структуре.
На странице документа: Каждая задача соответствует одному потоку выполнения

Теперь, среди многих других, одна типичная обязанность worker process (прочитайте здесь) в storm предназначен для отслеживания того, активна топология или нет, и сохраняется это конкретное состояние в переменной с именем storm-active-atom. Эта переменная используется задачами, чтобы определить, следует ли вызывать метод nextTuple. Итак, пока ваша топология жива (вы не поместили свой код носика, но предполагаете) до того времени, когда ваш таймер активен (как вы сказали определенное время) он будет продолжать вызывать метод nextTuple. Вы можете копнуть еще глубже, чтобы понять реализацию среды подтверждения шторма, чтобы понять, как он понимает и признает один раз кортеж успешно обработан и Гарантия обработки сообщений

Я уверен, что в моем понимании здесь чего-то не хватает, и из-за этого пробела я не могу подключиться к внутренней логике этого фреймворка.

Сказав это, я думаю, что более важно получить четкое представление о том, как работать со штормом, а не как понимать шторм на ранней стадии. например, вместо того, чтобы изучать внутренний механизм шторма, важно понимать, что если мы настроим носик для чтения файла построчно, он будет продолжать испускать каждую строку, используя метод _collector.emit, пока не достигнет EOF. И болт, связанный с ним, получает то же самое в своем execute(tuple input) методе

Надеюсь, это поможет вам поделиться с нами больше в будущем

person user2720864    schedule 05.12.2013
comment
Отлично, это помогает мне понять концепцию лучше, чем раньше. Благодарю вас за то, что вы нашли время, чтобы помочь мне понять это. Да, вы правы, что важно понимать работу со Штормом на ранних стадиях... - person JavaPassion; 06.12.2013
comment
пожалуйста, отнеситесь к этому как к очень базовому пониманию того, как именно работает шторм. Просто попытался поделиться несколькими моментами, надеясь, что это поможет вам копать дальше. На самом деле storm делает гораздо больше работы за кулисами, и чтобы понять, что это сначала требует очень четкого понимания базовых рабочих знаний. Внутренний механизм полностью скрыт от пользователя. Отказоустойчивая система, такая как то, как она обрабатывается, когда узел выходит из строя с помощью демона supervisor, или как Nimbus может назначать задачу другому узлу, если первый выходит из строя, — это чертовски сложная вещь, которую storm делает для вас :) - person user2720864; 06.12.2013
comment
Конечно, я сейчас в процессе изучения материала, как вы упомянули :) - person JavaPassion; 06.12.2013

Обычные носики

В демоне executor storm есть цикл, который многократно вызывает nextTuple (а также ack и fail, когда это необходимо) для соответствующего экземпляра spout.

Нет ожидания обработки кортежей. Spout просто получает fail для кортежей, которые не успели обработаться за данный таймаут. Это можно легко смоделировать с помощью топологии быстрого носика и медленного обрабатывающего болта: носик будет получать много fail вызовов.

См. также документацию ISpout:

nextTuple, ack и fail вызываются в узком цикле в одном потоке в задаче spout. Когда нет кортежей для генерации, было бы вежливо оставить nextTuple бездействующим на короткое время (например, на одну миллисекунду), чтобы не тратить слишком много ресурсов ЦП.


Носик трезубца

Ситуация совершенно иная для Trident-spouts:

По умолчанию Trident обрабатывает по одному пакету за раз, ожидая успешного или неудачного выполнения пакета, прежде чем пытаться выполнить другой пакет. Вы можете значительно увеличить пропускную способность и уменьшить задержку обработки каждого пакета, конвейерно распределяя пакеты. Вы настраиваете максимальное количество пакетов, которые будут обрабатываться одновременно, с помощью свойства topology.max.spout.pending.

Даже при одновременной обработке нескольких пакетов Trident упорядочивает любые обновления состояния, происходящие в топологии между пакетами.

person dedek    schedule 11.08.2015