Есть ли способ понять, когда все задачи завершены?

Допустим, я добавляю 100 push-задач (как группу 1) в свой tasks-queue. Затем я добавляю еще 200 задач (как группа 2) в ту же очередь. Как понять, что все задания группы 1 выполнены?

Похоже, QueueStatistics здесь не поможет. tag работает только с очередями вытягивания.

И у меня не может быть отдельных очередей (поскольку у меня могут быть сотни групп).


person LA_    schedule 13.01.2016    source источник
comment
Конечно, это будет нелегко. Вам, вероятно, понадобится сегментированный счетчик в хранилище данных, который увеличивается всякий раз, когда завершается задача в его группе. Затем вы можете проверить количество выполненных задач на основе группы из счетчика сегментов и посмотреть, соответствует ли оно количеству поставленных задач. Это все еще может быть не идеально, поскольку я считаю, что задачи могут запускаться дважды в некоторых обстоятельствах, поэтому ваш сегментированный счетчик также должен иметь надежный ключ, чтобы одна и та же задача, выполняемая дважды, устанавливала один и тот же объект счетчика.   -  person mgilson    schedule 13.01.2016


Ответы (2)


Я бы, вероятно, решил это, используя сегментированный счетчик в хранилище данных, как сказал @mgilson, и украсил свои отложенные функции для запуска обратного вызова, когда задачи завершены.

Я думаю, что это то, что вам нужно, если вы включите код на https://cloud.google.com/appengine/articles/sharding_counters?hl=en и напишите функцию декриментации в дополнение к функции инкремента.

import random
import time
from google.appengine.ext import deferred

def done_work():
  logging.info('work done!')

def worker(callback=None):
  def fst(f):
    def snd(*args, **kwargs):
      key = kwargs['shard_key']
      del kwargs['shard_key']

      retval = f(*args, **kwargs)

      decriment(key)
      if get_count(key) == 0:
        callback()

      return retval
    return snd
  return fst

def func(n):
  # do some work
  time.sleep(random.randint(1, 10) / 10.0)
  logging.info('task #{:d}'.format(n))

def make_some_tasks():
  func = worker(callback=done_work)(func)
  key = random.randint(0, 1000)
  for n in xrange(0, 100):
    increment(key)
    deferred.defer(func, n, shard_key=key)
person daniel    schedule 13.01.2016

Задачи не гарантируются только один раз, иногда даже успешно выполненные задачи могут повторяться. Вот такой пример: отложенная задача GAE повторная попытка из-за недоступности экземпляра, несмотря на то, что она уже удалась.

Из-за этого использование счетчика, увеличивающегося при постановке задачи в очередь и уменьшающегося при завершении задачи, не будет работать — в случае такого дублирующего выполнения он будет уменьшаться дважды, что приведет к прекращению всего вычисления.

Единственный надежный способ отслеживания выполнения задачи (на мой взгляд) — это независимое отслеживание каждой отдельной поставленной в очередь задачи. Это можно сделать с помощью имен задач (указанных или автоматически назначаются после успешной постановки в очередь) — они уникальны для данной очереди. Например, имена отслеживаемых задач могут храниться в списках задач в хранилище данных.

Примечание: это всего лишь теоретический ответ, который я получил, когда задал себе тот же вопрос, я не смог его проверить.

person Dan Cornilescu    schedule 13.01.2016