Установка атрибутов задачи Celery (например, time_limit и soft_time_limit) не работает

Судя по этой ветке, проблема решена, но похоже, что это не так. Установка ограничения времени для конкретной задачи с сельдереем

Моя текущая версия Celery — 3.1.18 (Cipater).

Я пытаюсь перезаписать настройки задачи по умолчанию. Цель состоит в том, чтобы изменить мягкое и жесткое ограничение времени задачи, поскольку одна и та же задача используется для нескольких целей.

Передача soft_time_limit и time_limit в конструктор MyTask для изменения настроек по умолчанию.

///celery/app/ task.py
class MyTask(task.Task):   
    time_limit = 100
    soft_time_limit = 110
    max_retries = 0

def __init__(self, time_limit=None, soft_time_limit=None,
             max_retries=None, *args, **kwargs):
    if time_limit:
        self.time_limit = time_limit
    if soft_time_limit:
       self.soft_time_limit = soft_time_limit
    if max_retries:
       self.max_retries = max_retries
    task.Task.__init__(self, *args, **kwargs)


t1 = MyTask(time_limit=30, soft_time_limit=20,
        max_retries=5)
or

t1 = MyTask()
t1.time_limit = 30
t1.soft_time_limit = 20

Затем передайте t1.si() в task.RetryableChain(...)

job = task.RetryableChain(...)
job.delay()

Когда метод запуска вызывается работником, он по-прежнему получает старое значение (time_limit = 100), тогда как я установил time_limit = 30.

Пожалуйста, дайте мне знать, если проблема все еще существует в версии 3.1.18.


person S.Kar    schedule 19.02.2017    source источник


Ответы (1)


Мне пришлось исправить код сельдерея, чтобы заставить его работать. Это, безусловно, временное решение, но оно работает. Я не уверен, что когда атрибуты устанавливаются с новыми значениями, то почему они не переносятся в worker.job. Я чувствую, что когда мы вызывали task.si или s(), он создает экземпляр Signature, который не содержит этих атрибутов time_limit, поэтому он берет исходные значения, хранящиеся в классе. Просто мысль.

t1 = MyTask()
kwargs = {}
kwargs['time_limit'] = 30
kwargs['soft_time_limit'] = 40

t.s(kwargs)

---->>> /сельдерей/рабочий/job.py

def execute_using_pool(self, pool, **kwargs):
    """Used by the worker to send this task to the pool.

    :param pool: A :class:`celery.concurrency.base.TaskPool` instance.

    :raises celery.exceptions.TaskRevokedError: if the task was revoked
        and ignored.

    """
    uuid = self.id
    task = self.task
    if self.revoked():
        raise TaskRevokedError(uuid)

    hostname = self.hostname
    kwargs = self.kwargs
    if task.accept_magic_kwargs:
        kwargs = self.extend_with_default_kwargs()
    request = self.request_dict
    request.update({'hostname': hostname, 'is_eager': False,
                    'delivery_info': self.delivery_info,
                    'group': self.request_dict.get('taskset')})
    timeout, soft_timeout = request.get('timelimit', (None, None))
    # timeout = timeout or task.time_limit
    # soft_timeout = soft_timeout or task.soft_time_limit
    **# SKAR  request.get(‘time limit’) always returns the original value stored in Task.
    timeout = kwargs.get('time_limit', task.time_limit)
    soft_timeout = kwargs.get('soft_time_limit', task.soft_time_limit)**
    result = pool.apply_async(
        trace_task_ret,
        args=(self.name, uuid, self.args, kwargs, request),
        accept_callback=self.on_accepted,
person S.Kar    schedule 19.02.2017