Синхронизация данных из нескольких источников данных

Наша команда пытается создать систему профилактического обслуживания, задачей которой является просмотр набора событий и прогнозирование того, отражают ли эти события набор известных аномалий или нет.

Мы находимся на этапе проектирования, и текущий дизайн системы выглядит следующим образом:

  • События могут происходить на нескольких источниках системы IoT (например, на облачной платформе, пограничных устройствах или любых промежуточных платформах).
  • События отправляются источниками данных в систему очередей сообщений (в настоящее время мы выбрали Apache Kafka).
  • У каждого источника данных есть своя очередь (тема Kafka).
  • Из очередей данные потребляются несколькими механизмами логического вывода (которые на самом деле являются нейронными сетями).
  • В зависимости от набора функций механизм вывода будет подписываться на несколько тем Kafka и передавать данные из этих тем для непрерывного вывода вывода.
  • Общая архитектура следует принципу единой ответственности, что означает, что каждый компонент будет отделен друг от друга и будет работать внутри отдельного контейнера Docker.

Проблема:

Чтобы классифицировать набор событий как аномалию, события должны произойти в одном и том же временном окне. например скажем, есть три источника данных, которые передают соответствующие события в темы Kafka, но по какой-то причине данные не синхронизируются. Таким образом, один из механизмов вывода извлекает последние записи из каждой из тем kafka, но соответствующие события в извлеченных данных не принадлежат одному и тому же временному окну (скажем, 1 часу). Это приведет к неверным прогнозам из-за несинхронизированных данных.

Вопрос

Нам нужно выяснить, как мы можем убедиться, что данные из всех трех источников передаются по порядку, чтобы, когда механизм логического вывода запрашивает записи (скажем, последние 100 записей) из нескольких тем какфа, соответствующие записи в каждой теме принадлежали то же временное окно?


person sgarizvi    schedule 27.05.2019    source источник
comment
Вы задали довольно интересный вопрос. Возможно, эта статья будет привести вас к некоторому решению.   -  person dmkvl    schedule 01.06.2019


Ответы (4)


Я бы предложил KSQL, который представляет собой механизм потокового SQL, который позволяет обрабатывать данные в реальном времени против Апач Кафка. Он также обеспечивает хорошую функциональность для оконного агрегирования и т. д.

Существует 3 способа определить Windows в KSQL:

скачкообразные окна, переворачивающиеся окна и окна сеанса. Прыгающие и переворачивающиеся окна — это временные окна, поскольку они определяются фиксированной длительностью, которую вы указываете. Окна сеанса имеют динамический размер на основе входящих данных и определяются периодами активности, разделенными промежутками бездействия.

В вашем контексте вы можете использовать KSQL для запроса и объединения интересующих тем с помощью Оконные соединения. Например,

SELECT t1.id, ...
  FROM topic_1 t1
  INNER JOIN topic_2 t2
    WITHIN 1 HOURS
    ON t1.id = t2.id;
person Giorgos Myrianthous    schedule 05.06.2019

Некоторые предложения -

  1. Обработка задержки на стороне производителя. Убедитесь, что все три производителя всегда синхронно отправляют данные в темы Kafka, используя batch.size и linger.ms. например. если для linger.ms установлено значение 1000, все сообщения будут отправляться в Kafka в течение 1 секунды.

  2. Обрабатывать задержку на стороне потребителя. Принимая во внимание любой механизм потоковой передачи на стороне потребителя (будь то Kafka-stream, spark-stream, Flink), предоставляет функциональные возможности Windows для объединения/объединения потоковых данных на основе ключей с учетом функции окна с задержкой.

Проверьте это - окна Flink для справки, как выбрать правильный тип окна ссылка

person Amrit Jangid    schedule 30.05.2019

Чтобы справиться с этим сценарием, источники данных должны предоставить потребителю некоторый механизм, позволяющий понять, что все соответствующие данные получены. Самое простое решение — опубликовать пакет из источника данных с идентификатором пакета (Guid) в той или иной форме. Затем потребители могут подождать, пока не появится идентификатор следующей партии, отмечающий конец предыдущей партии. Этот подход предполагает, что источники не будут пропускать пакет, иначе они будут постоянно смещены. Не существует алгоритма для обнаружения этого, но у вас могут быть некоторые поля в данных, которые показывают неоднородность и позволяют вам повторно выравнивать данные.

Более слабая версия этого подхода состоит в том, чтобы либо просто подождать x секунд и предположить, что все источники преуспевают в течение этого большого количества времени, либо посмотреть на некоторую форму меток времени (логические или настенные часы), чтобы определить, что источник перешел к следующему временному окну. неявно показывая завершение последнего окна.

person vasha    schedule 31.05.2019

Следующие рекомендации должны максимизировать успех синхронизации событий для проблемы обнаружения аномалий с использованием данных временных рядов.

  1. Используйте синхронизатор сетевого времени на всех узлах производителя/потребителя.
  2. Используйте контрольное сообщение от производителей каждые x единиц времени с фиксированным временем начала. Например: сообщения отправляются каждые две минуты в начале минуты.
  3. Создайте предикторы для задержки сообщения производителя. используйте сообщения сердцебиения, чтобы вычислить это.

С помощью этих примитивов мы должны иметь возможность выравнивать события временных рядов, учитывая отклонения во времени из-за сетевых задержек.

На стороне механизма логического вывода расширьте окна на уровне каждого производителя, чтобы синхронизировать события между производителями.

person vvg    schedule 05.06.2019
comment
Спасибо за предложения. Ваше решение вполне разумно, но так как у меня ограниченное знание всей системы, я искал какие-то практические решения (инструменты, доступные для реализации указанной задачи) в дополнение к концептуальному решению. - person sgarizvi; 06.06.2019
comment
Для синхронизации времени по сети используйте NTP. Это можно сделать при запуске узла или перезагрузке устройства. Сообщения Heartbeat можно публиковать в теме Kafka. Вам просто нужны ProducerId, TimeStamp, ArrivalTimeStamp. Наличие сообщения указывает на сердцебиение. См.: gerardnico.com/dit/kafka/timestamp для обсуждения извлечения временных меток. - person vvg; 08.06.2019
comment
Предикторы задержки сообщений можно создавать с использованием того же стека машинного обучения, который вы используете для механизма логического вывода. Поскольку могут быть потеряны сообщения, вам необходимо рассмотреть модель оставшихся сообщений, такую ​​​​как пропорциональная опасность Кокса, чтобы обеспечить точность. - person vvg; 08.06.2019