#Storm: как настроить разные метрики для одного и того же источника данных

Я пытаюсь настроить Storm для агрегирования потока, но с различными (доступными DRPC) метриками для одного и того же потока.

Например. поток состоит из сообщений, у которых есть отправитель, получатель, канал, по которому пришло сообщение, и шлюз, через который оно было доставлено. У меня возникли проблемы с решением, как организовать одну или несколько топологий, которые могли бы дать мне, например. общее количество сообщений по шлюзу и/или по каналу. И, кроме общего количества, было бы неплохо также считать количество в минуту.

Основная идея состоит в том, чтобы иметь носик, который будет принимать события обмена сообщениями и оттуда собирать данные по мере необходимости. В настоящее время я играю с Trident и DRPC и придумал две возможные топологии, которые решают проблему на данном этапе. Не могу решить, какой подход лучше, если таковой имеется?!

Весь исходный код доступен по этой основной части. Он имеет три класса:

  • RandomMessageSpout
    • used to emit the messaging data
    • имитирует реальный источник данных
  • SeparateTopology
    • creates a separate DRPC stream for each metric needed
    • также для каждой метрики создается отдельное состояние запроса
    • все они используют один и тот же экземпляр spout
  • CombinedTopology
    • creates a single DRPC stream with all the metrics needed
    • создает отдельное состояние запроса для каждой метрики
    • каждое состояние запроса извлекает желаемую метрику и группирует результаты для нее

Теперь о проблемах и вопросах:

  • SeparateTopology
    • is it necessary to use the same spout instance or can I just say new RandomMessageSpout() each time?
    • Мне нравится идея, что мне не нужно сохранять сгруппированные данные по всем метрикам, а только те группы, которые нам нужно извлечь позже
    • - это исходящие данные, фактически обработанные всеми комбинациями состояния/запроса, например. не первый попавшийся?
    • позволит ли это также позже динамически добавлять новые комбинации состояния/запроса во время выполнения?
  • CombinedTopology
    • I don't really like the idea that I need to persist data grouped by all the metrics since I don't need all the combinations
    • it came as a surprise that the all the metrics always return the same data
      • e.g. channel and gateway inquiries return status metrics data
      • Я обнаружил, что это всегда были данные, сгруппированные по первому полю в определении состояния< /а>
      • эта тема объясняет причину такого поведения.
      • но мне интересно, хороший ли это способ сделать что-то в первую очередь (и при необходимости найдет способ обойти эту проблему)
  • SnapshotGet vs TupleCollectionGet in stateQuery
    • with SnapshotGet things tended to work, but not always, only TupleCollectionGet solved the issue
    • любые указатели относительно того, что является правильным способом сделать это?

Я предполагаю, что это длинный вопрос/тема, но любая помощь очень ценится! Кроме того, если я полностью пропустил архитектуру, предложения о том, как это сделать, будут приветствоваться. Заранее спасибо :-)


person tohokami    schedule 26.09.2013    source источник


Ответы (1)


На самом деле вы не можете разделить поток в SeparateTopology, вызвав newStream() с использованием одного и того же экземпляра воронки, так как это создаст новые экземпляры той же воронки RandomMessageSpout, что приведет к дублированию значений, передаваемых в вашу топологию несколькими отдельными экземплярами воронки. (Распараллеливание воронки возможно только в Storm с секционированными воронками, где каждый экземпляр воронки обрабатывает раздел всего набора данных — например, раздел Kafka).

Правильный подход здесь состоит в том, чтобы изменить CombinedTopology, чтобы разделить поток на несколько потоков по мере необходимости для каждой необходимой вам метрики (см. ниже), а затем выполнить groupBy() для поля этой метрики и persistentAggregate() для каждого вновь разветвленного потока.

Из часто задаваемых вопросов Trident,

«каждый» возвращает объект Stream, который вы можете сохранить в переменной. Затем вы можете запустить несколько штук в одном и том же потоке, чтобы разделить его, например:

Stream s = topology.each(...).groupBy(...).aggregate(...)
Stream branch1 = s.each(...)
Stream branch2 = s.each(...)

См. эту тему в списке рассылки Storm и это для получения дополнительной информации.

person schiavuzzi    schedule 09.10.2013