Мы создаем интеграционный тест для конвейера Apache Beam и сталкиваемся с некоторыми проблемами. См. Контекст ниже ...
Подробная информация о нашем трубопроводе:
- Мы используем
PubsubIO
в качестве источника данных (неограниченныйPCollection
) - Промежуточные преобразования включают настраиваемую
CombineFn
и очень простую стратегию работы с окнами / запуском - Наше последнее преобразование -
JdbcIO
, используяorg.neo4j.jdbc.Driver
для записи в Neo4j
Текущий подход к тестированию:
- Запустите эмулятор Google Cloud Pub / Sub на компьютере, на котором выполняются тесты.
- Создайте базу данных Neo4j в памяти и передайте ее URI в наши параметры конвейера.
- Запустите конвейер, позвонив
OurPipeline.main(TestPipeline.convertToArgs(options)
- Используйте клиентскую библиотеку Google Cloud Java Pub / Sub для публикации сообщений в тестовой теме (с помощью эмулятора Pub / Sub), которую
PubsubIO
будет читать из - Данные должны проходить по конвейеру и в конечном итоге попасть в наш экземпляр Neo4j в памяти.
- Сделайте простые утверждения относительно наличия этих данных в Neo4j
Предполагается, что это будет простой интеграционный тест, который позволит убедиться, что наш конвейер в целом ведет себя так, как ожидалось.
Проблема, с которой мы сейчас сталкиваемся, заключается в том, что когда мы запускаем наш конвейер, он блокируется. Мы используем DirectRunner
и pipeline.run()
(не pipeline.run().waitUntilFinish()
), но кажется, что тест завис после запуска конвейера. Поскольку это неограниченный PCollection
(работает в потоковом режиме), конвейер не завершается, и, следовательно, любой код после него не достигается.
Итак, у меня есть несколько вопросов:
1) Есть ли способ запустить конвейер, а потом остановить его вручную?
2) Есть ли способ запустить конвейер асинхронно? В идеале он просто запустит конвейер (который затем будет непрерывно опрашивать Pub / Sub на предмет данных), а затем перейти к коду, отвечающему за публикацию в Pub / Sub.
3) Является ли этот метод интеграционного тестирования конвейером разумным или есть лучшие методы, которые могут быть более простыми? Любая информация / руководство здесь будут оценены.
Сообщите мне, могу ли я предоставить дополнительный код / контекст - спасибо!