Создание отдельных процессов из рабочего/альтернативного решения сельдерея?

Я разрабатываю веб-сервис, который будет использоваться в качестве поставщика «база данных как услуга». Цель состоит в том, чтобы иметь небольшую веб-службу на основе фляги, работающую на некотором хосте, и «рабочие» процессы, работающие на разных хостах, принадлежащих разным командам. Всякий раз, когда член команды приходит и запрашивает новую базу данных, я должен создать ее на их хосте. Теперь проблема... Служба, которую я запускаю, должна быть запущена. Однако рабочий может быть перезапущен. Могло случиться 5 минут, могло случиться 5 дней. Простой Popen не поможет, потому что он создаст дочерний процесс, и если рабочий процесс остановится позже, процесс Popen будет уничтожен (я пробовал это).

У меня есть реализация, использующая многопроцессорность, которая работает как чемпион, к сожалению, я не могу использовать это с сельдереем. так что не повезло там. Я попытался уйти от многопроцессорной библиотеки с двойным форком и именованными каналами. Самый минимальный образец, который я мог произвести:

def launcher2(working_directory, cmd, *args):
    command = [cmd]
    command.extend(list(args))

    process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    with open(f'{working_directory}/ipc.fifo', 'wb') as wpid:
        wpid.write(process.pid)


@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
    working_directory = '/var/tmp/workdir'
    if not os.path.exists(working_directory):
        os.makedirs(working_directory, mode=0o700)

    ipc = f'{working_directory}/ipc.fifo'
    if os.path.exists(ipc):
        os.remove(ipc)
    os.mkfifo(ipc)
    pid1 = os.fork()
    if pid1 == 0:
        os.setsid()
        os.umask(0)

        pid2 = os.fork()
        if pid2 > 0:
            sys.exit(0)

        os.setsid()
        os.umask(0)

        launcher2(working_directory, cmd, *args)
    else:
        with os.fdopen(os.open(ipc, flags=os.O_NONBLOCK | os.O_RDONLY), 'rb') as ripc:
            readers, _, _ = select.select([ripc], [], [], 15)
            if not readers:
                raise TimeoutError(60, 'Timed out', ipc)
            reader = readers.pop()
            pid = struct.unpack('I', reader.read())[0]
        pid, status = os.waitpid(pid, 0)
        print(status)


if __name__ == '__main__':
    async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
    print(async_result.get())

Мой вариант использования более сложен, но я не думаю, что кто-то захочет читать 200+ строк начальной загрузки, но это не работает точно так же. С другой стороны, я не жду pid, если это не требуется, так что это похоже на запуск процесса по запросу и пусть он выполняет свою работу. Начальная загрузка базы данных занимает примерно минуту с полной настройкой, и я не хочу, чтобы клиенты ждали минуту. Приходит запрос, я запускаю процесс и отправляю обратно идентификатор экземпляра базы данных, и клиент может запросить статус на основе полученного идентификатора экземпляра. Однако с приведенным выше решением для разветвления я получаю:

[2020-01-20 18:03:17,760: INFO/MainProcess] Received task: Test[dbebc31c-7929-4b75-ae28-62d3f9810fd9]  
[2020-01-20 18:03:20,859: ERROR/MainProcess] Process 'ForkPoolWorker-2' pid:16634 exited with 'signal 15 (SIGTERM)'
[2020-01-20 18:03:20,877: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 15 (SIGTERM).')
Traceback (most recent call last):
  File "/home/pupsz/PycharmProjects/provider/venv37/lib/python3.7/site-packages/billiard/pool.py", line 1267, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 15 (SIGTERM).

Что заставляет меня задуматься, что может происходить. Я попробовал еще более простую задачу:

@shared_task(bind=True, name="Test")
def run(self, cmd, *args):
    working_directory = '/var/tmp/workdir'
    if not os.path.exists(working_directory):
        os.makedirs(working_directory, mode=0o700)

    command = [cmd]
    command.extend(list(args))

    process = subprocess.Popen(command, cwd=working_directory, shell=False, start_new_session=True,
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return process.wait()


if __name__ == '__main__':
    async_result = run.apply_async(('/usr/bin/sleep', '15'), queue='q2')
    print(async_result.get())

Который снова терпит неудачу с той же самой ошибкой. Теперь мне нравится Celery, но от этого такое ощущение, что он не подходит для моих нужд. Я что-то напутал? Можно ли добиться, что мне нужно сделать от работника? Есть ли у меня какие-либо альтернативы, или я должен просто написать свою собственную очередь задач?


person Display name    schedule 20.01.2020    source источник
comment
Celery не поддерживает многопроцессорность, поэтому попробуйте использовать бильярд вместо многопроцессорной обработки (from billiard import Process и т. д.). Я надеюсь, что однажды ребята из Celery проведут серьезный рефакторинг этого кода, удалят бильярд и вместо этого начнут использовать многопроцессорность...   -  person DejanLekic    schedule 20.01.2020
comment
Да, сейчас ничего не работает. Не только многопроцессорность, но и простое Popen и fork также приводят к выходу работника с SIGTERM.   -  person Display name    schedule 20.01.2020
comment
Попен тоже часть этого...   -  person DejanLekic    schedule 20.01.2020
comment
справедливо, я попытаюсь изменить multiprocessing.Process на billiard.Process... Думаю, мне придется использовать другой именованный канал для IPC, так как biliard, похоже, не может возвращать значение из процесса.   -  person Display name    schedule 20.01.2020
comment
Дайте нам знать, если это сработало.   -  person DejanLekic    schedule 21.01.2020
comment
Я уверен. Я перешел на billiard.Process, просто реализовав какое-то грубое решение для возвращаемого значения, поскольку оно ничего для них не дает.   -  person Display name    schedule 21.01.2020
comment
Это было не просто, но это был замечательный совет! Я перешел с multiprocessing.Process на billiard.context.Process, и из этого процесса я мог создать отдельную оболочку с помощью Popen и выйти из процесса бильярда. Это действительно работает! Если вы ответите на это, я приму это, так как я никогда не думал об этом сам. Большое спасибо!!!   -  person Display name    schedule 21.01.2020


Ответы (1)


Celery не поддерживает многопроцессорность, поэтому попробуйте использовать бильярд вместо многопроцессорной обработки (из процесса импорта бильярда и т. д.). Я надеюсь, что однажды ребята из Celery проведут серьезный рефакторинг этого кода, удалят бильярд и вместо этого начнут использовать многопроцессорность...

Итак, пока они не перейдут на многопроцессорность, мы застряли на бильярде. Мой совет — удалить любое использование многопроцессорности в задачах Celery и начать использовать billiard.context.Process и подобные, в зависимости от вашего варианта использования.

person DejanLekic    schedule 21.01.2020