Apache Beam - тест интеграции с неограниченной коллекцией PCollection

Мы создаем интеграционный тест для конвейера Apache Beam и сталкиваемся с некоторыми проблемами. См. Контекст ниже ...

Подробная информация о нашем трубопроводе:

  • Мы используем PubsubIO в качестве источника данных (неограниченный PCollection)
  • Промежуточные преобразования включают настраиваемую CombineFn и очень простую стратегию работы с окнами / запуском
  • Наше последнее преобразование - JdbcIO, используя org.neo4j.jdbc.Driver для записи в Neo4j

Текущий подход к тестированию:

  • Запустите эмулятор Google Cloud Pub / Sub на компьютере, на котором выполняются тесты.
  • Создайте базу данных Neo4j в памяти и передайте ее URI в наши параметры конвейера.
  • Запустите конвейер, позвонив OurPipeline.main(TestPipeline.convertToArgs(options)
  • Используйте клиентскую библиотеку Google Cloud Java Pub / Sub для публикации сообщений в тестовой теме (с помощью эмулятора Pub / Sub), которую PubsubIO будет читать из
  • Данные должны проходить по конвейеру и в конечном итоге попасть в наш экземпляр Neo4j в памяти.
  • Сделайте простые утверждения относительно наличия этих данных в Neo4j

Предполагается, что это будет простой интеграционный тест, который позволит убедиться, что наш конвейер в целом ведет себя так, как ожидалось.

Проблема, с которой мы сейчас сталкиваемся, заключается в том, что когда мы запускаем наш конвейер, он блокируется. Мы используем DirectRunner и pipeline.run() (не pipeline.run().waitUntilFinish()), но кажется, что тест завис после запуска конвейера. Поскольку это неограниченный PCollection (работает в потоковом режиме), конвейер не завершается, и, следовательно, любой код после него не достигается.

Итак, у меня есть несколько вопросов:

1) Есть ли способ запустить конвейер, а потом остановить его вручную?

2) Есть ли способ запустить конвейер асинхронно? В идеале он просто запустит конвейер (который затем будет непрерывно опрашивать Pub / Sub на предмет данных), а затем перейти к коду, отвечающему за публикацию в Pub / Sub.

3) Является ли этот метод интеграционного тестирования конвейером разумным или есть лучшие методы, которые могут быть более простыми? Любая информация / руководство здесь будут оценены.

Сообщите мне, могу ли я предоставить дополнительный код / ​​контекст - спасибо!


person Chris Staikos    schedule 23.06.2017    source источник


Ответы (1)


Вы можете запустить конвейер асинхронно, используя DirectRunner, передав для параметра конвейера isBlockOnRun значение false. Пока вы сохраняете доступной ссылку на возвращенный PipelineResult, вызов cancel() для этого результата должен остановить конвейер.

Что касается вашего третьего вопроса, ваша установка кажется разумной. Однако, если вы хотите провести тест конвейера в меньшем масштабе (требующий меньшего количества компонентов), вы можете инкапсулировать всю логику обработки в пользовательский файл PTransform. Этот PTransform должен принимать входные данные, которые были полностью проанализированы из входного источника, и выдавать выходные данные, которые еще не были проанализированы для выходного приемника.

Когда это будет сделано, вы можете использовать либо Create (который обычно не выполняет триггеры), либо TestStream (который может, в зависимости от того, как вы создаете TestStream) с DirectRunner для генерации конечного количества входных данных, применить эту обработку PTransform к этому PCollection и используйте PAssert на выходе PCollection, чтобы убедиться, что конвейер сгенерировал ожидаемые выходные данные.

Для получения дополнительной информации о тестировании на веб-сайте Beam есть информация об этих стилях тестов в Руководство по программированию и запись в блоге о тестирование конвейеров с TestStream.

person Thomas Groh    schedule 23.06.2017