Когда дело доходит до обработки данных, нам часто приходится создавать конвейеры данных для поэтапной обработки данных на всем пути от их извлечения из источника до их преобразования и агрегирования на целевой платформе. Эти конвейеры могут быть построены с использованием различных инструментов, и мы можем использовать комбинацию инструментов, чтобы получить максимальную отдачу от них, поскольку разные инструменты имеют разные наборы возможностей.

Эта статья является еще одной из серии статей о том, как мы можем переключить обработку данных с одного инструмента на другой в зависимости от требований, и мы снова собираемся поговорить о Airflow и n8n, но на этот раз о том, как мы можем запустить Airflow DAG. от н8н. Прочтите предыдущую статью Запуск рабочего процесса n8n из Airflow DAG, чтобы узнать, как запустить рабочий процесс n8n из Airflow.

Здесь мы рассмотрим MWAA (Amazon Managed Workflows for Apache Airflow). N8n — это инструмент автоматизации рабочих процессов на основе узлов, который поставляется с несколькими коннекторами, позволяющими создавать рабочие процессы в соответствии с парадигмой low-code. Однако иногда возникает необходимость написать собственную логику, и в n8n это возможно с использованием JavaScript. N8n предоставляет http-узел для простой настройки конечных точек API с соответствующим механизмом аутентификации, и мы, возможно, можем использовать его для вызова API-интерфейсов воздушного потока для запуска DAG. Поскольку API-интерфейсы воздушного потока не отображаются напрямую в MWAA, мы можем использовать интерфейс командной строки MWAA для запуска DAG. Логика для подключения к среде воздушного потока в MWAA и отправки команды для запуска DAG может быть реализована с использованием JavaScript, и ее можно разместить в соответствующем узле n8n. См. Вызов MWAA DAG из JavaScript для примера кода.

Ключевые шаги включают,

  • Получение имени хоста и токена CLI для данной среды MWAA
  • Создание URL-адреса с использованием имени хоста
  • Создание команды для запуска DAG
  • Создание заголовка с токеном CLI
  • Выполнение почтового вызова API с использованием соответствующего пакета (например, axios) с передачей URL, команды и заголовка в качестве входных параметров.

Где это полезно? Когда мы хотим активировать DAG Airflow из n8n? Это очевидные вопросы, которые приходят на ум. Хотя может быть несколько вариантов использования, одним из них может быть уведомление воздушного потока после завершения выполнения рабочего процесса n8n, чтобы дальнейшие действия могли выполняться в воздушном потоке.