Celery Period_task выполняется несколько раз параллельно

У меня есть очень простой периодический код, использующий потоки Celery; он просто печатает «Pre» и «Post» и спит между ними. Он адаптирован из этого вопроса StackOverflow и этот связанный веб-сайт

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

Этот код никогда не печатает 'already in use...'; то же самое происходит, когда я использую декоратор @single_instance_task.

Вы знаете, что случилось?

Редактировать: я упростил вопрос, чтобы он не записывался в память (используя глобальный кеш или кеш django); Я до сих пор никогда не вижу 'already in use...'


Изменить: когда я добавляю следующий код в свой файл Django settings.py (путем изменения кода из https://docs.djangoproject.com/en/dev/topics/cache/ все работает как положено, но только когда я использую порт 11211 (как ни странно, мой сервер подключен к порту 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

person user    schedule 10.10.2011    source источник


Ответы (1)


Как у вас дела с сельдереем? Я не знаком с резьбовым вариантом.

Если он работает с несколькими процессами, то нет «глобальных» переменных, которые являются общей памятью между рабочими.

Если вы хотите, чтобы счетчик был общим для всех рабочих, я бы посоветовал вам использовать cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Обновить

Что произойдет, если вы заставите свои задачи перекрываться во время сна, например:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

Если вы не получаете сообщение «уже используется ...» при более частом запуске, чем 20 секунд, вы знаете, что кеш ведет себя не так, как ожидалось.


Еще одно обновление

Вы настроили кеш-бэкенд в настройках django? Например. memcached

Если нет, возможно, вы используете фиктивный кэш, который на самом деле не кэширует, а просто реализует интерфейс... что звучит как убедительная причина вашей проблемы.

person MattH    schedule 11.10.2011
comment
+1 Похоже, это связано с моей проблемой. Я пытался использовать кеш, но по-прежнему вижу ошибочные значения counter. Кроме того, я вижу, что несколько рабочих входят в функцию test. Я запускаю celeryd с django: python manage.py celeryd -v 2 -B -s celery -E -l INFO - person user; 11.10.2011
comment
Даже когда я упрощаю так, что функция test просто печатает приветствие, она запускается на разных воркерах и печатает слишком часто (даже если у меня определен декоратор @single_instance_task). - person user; 13.10.2011
comment
Я упростил код (выше), чтобы он печатал только (как вы предложили). Он по-прежнему никогда не печатает 'already in use...'; почему-то кеш не блокируется. - person user; 14.10.2011
comment
Какой кеш-сервер вы используете? Я успешно использую этот рецепт с memcached. - person MattH; 14.10.2011
comment
Ах -- я делаю from django.core.cache import cache; это с сайта ask.github.com/celery/cookbook/tasks.html. Кроме того, важно разрешить параллелизм сельдерея ›1. При concurrency = 1 он никогда не выдаст ошибку, но никогда не напечатает «уже используется...» - person user; 14.10.2011