Тупик SubDagOperator воздушного потока

Я столкнулся с проблемой, когда группа DAG, состоящая из нескольких SubDagOperators, зависает на неопределенный срок.

Настройка: Использование CeleryExecutor. Для целей этого примера предположим, что у нас есть один рабочий процесс, который может одновременно выполнять пять задач. DAG, с которым у меня возникают проблемы, запускает несколько SubDagOperators параллельно. Для иллюстрации рассмотрим следующий граф, где каждый узел является SubDagOperator: DAG, который перестает работать

Проблема: группа обеспечения доступности баз данных перестанет продвигаться вперед в высокопараллельной части группы обеспечения доступности баз данных. Основная причина, по-видимому, заключается в том, что SubDagOperators верхнего уровня занимают все пять слотов, доступных для выполнения задач, поэтому ни одна из подзадач внутри этих SubDagOperators не может быть запущена. Эти подзадачи застревают в очереди, и никто не продвигается вперед.

Меня немного удивило, что задачи SubDagOperator будут конкурировать со своими собственными подзадачами за слоты для запуска задач, но теперь это имеет для меня смысл. Существуют ли передовые методы написания SubDagOperators, которые я нарушаю?

Мой план состоит в том, чтобы обойти эту проблему, создав собственный оператор для инкапсуляции задач, которые в настоящее время инкапсулированы внутри SubDagOperators. Мне было интересно, есть ли у кого-нибудь совет, целесообразно ли создавать оператора, состоящего из других операторов?


person Bryan    schedule 17.11.2017    source источник
comment
Насколько я понимаю, способ справиться с взаимоблокировками параллелизма, подобным этому, заключается в использовании pools, но у меня недостаточно опыта, чтобы написать ответ.   -  person 7yl4r    schedule 20.12.2017
comment
Ваше изображение кажется сломанным.   -  person Kyle Bridenstine    schedule 08.08.2018


Ответы (1)


Похоже, SubDagOperator следует избегать, потому что он вызывает эту проблему взаимоблокировки. В конце концов я обнаружил, что для моего варианта использования мне лучше всего написать собственный пользовательский подкласс BaseOperator для выполнения задач, которые я выполнял внутри SubDagOperator. Написание класса оператора оказалось намного проще, чем я ожидал.

person Bryan    schedule 22.11.2017
comment
Предлагаем прочитать Устранение взаимоблокировки SubDagOperator в воздушном потоке - person y2k-shubham; 11.07.2018
comment
Можете ли вы привести пример вашего нового Оператора? - person Kyle Bridenstine; 08.08.2018