Как справедливо назначать задачи работникам в Python? - Разделение итерируемого на куски одинакового размера

У меня есть рабочие и задачи:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

Теперь я хочу разделить задачи на куски или пакеты работы, чтобы каждый рабочий мог работать над одним пакетом и выполнять примерно такой же объем работы, как и все остальные. В реальной жизни я хочу планировать пакетные задания для вычислительной фермы. Пакетные задания должны выполняться параллельно. Фактическое расписание и отправка выполняются с помощью коммерческого инструмента, такого как lsf или grid.

Некоторые примеры того, что я ожидаю:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

Этот вопрос очень похож на вопросы здесь, здесь и здесь

Разница в том, что мне нужны эти функции в порядке или приоритете:

  1. Не использовать len, по возможности не создавать длинные структуры данных внутри
  2. Принять генератор
  3. Возвратные генераторы
  4. Максимально возможное использование компонентов stdlib

Некоторые примечания о требованиях:

  • Никаких диктовок специально: у меня есть рабочие с одним и тем же именем, которые могут выполнять несколько пакетов (имена хостов unix). Если ваше решение использует словари, это нормально, потому что мы всегда можем выполнять поиск рабочих операций с помощью пакетного перечисления.
  • Произвольная длина: как рабочие процессы, так и задачи могут быть итерируемыми объектами любой длины >= 1. И они не должны разделяться поровну, как показано в примере выше, где Мэри получает только одну задачу.
  • Порядок: Мне не важен. Я предполагаю, что другие могут предпочесть какой-то порядок, например [0,1], [2,3], [5], но мне все равно. Если ваше решение может сохранить или изменить порядок, возможно, на это стоит указать другим.

Я попытался обдумать itertools и эту конкретную проблему и придумал следующий код, чтобы проиллюстрировать вопрос:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

Это удовлетворяет 4., но сортировка, скорее всего, нарушает 1.. и 2./3. даже не задумываются.

Вероятно, есть какое-то простое решение, объединяющее некоторые компоненты stdlib таким образом, о котором я не думал. А может и нет. Есть берущие?


person cfi    schedule 30.10.2012    source источник


Ответы (5)


Вы должны предварительно дозировать?

Почему бы просто не создать очередь, чтобы каждый рабочий выходил из очереди, когда он заканчивает единицу работы?

person Tyler Eaves    schedule 30.10.2012
comment
Хорошая точка зрения. Должен уточнить, что речь идет о планировании заданий для машин, которые должны работать параллельно. Весь смысл в том, чтобы распараллелить рабочую нагрузку, чтобы уменьшить задержку от начала до результата. - person cfi; 31.10.2012

После ответа Тайлера:

def doleOut(queue, workers):
    for worker,task in itertools.izip(itertools.cycle(workers),queue):
        yield worker,task

Это будет возвращать (worker, task) кортежей, пока есть очередь. Итак, если у вас есть блокировка waitForMoreWork, вы можете сделать это:

queue = []
doler = distribute_work(workers, queue)
while 1:
    queue.append(waitForMoreWork)
    currentqueuelen = len(queue)
    for i in range(0,queuelen):
        worker,item = doler.next()
        worker.passitem(item)

Таким образом, он будет блокироваться до тех пор, пока не появится больше элементов очереди, затем распределить их, а затем снова заблокировать. Вы можете настроить свое выражение waitForMoreWork так, чтобы оно выдавало столько предметов за раз, сколько кажется разумным.

person Phil H    schedule 30.10.2012
comment
Для Python 3 замените itertools.izip на zip. - person cfi; 31.10.2012
comment
К сожалению, это половина решения, которое у меня есть, хотя и намного проще и приятнее. Это производит только один генератор, мне понадобится по одному на каждого рабочего. - person cfi; 31.10.2012
comment
@cfi: Если у вас нет единого метода распределения нагрузки, как вы будете правильно распределять их? Единственный способ - перечислить их и использовать по модулю или какое-то другое значение, но где-то вам придется указать перечисление... - person Phil H; 31.10.2012
comment
Правильный. Придерживаясь генераторов, нам пришлось бы сначала создать что-то вроде вывода вашего doleOut, затем itertools.tee для каждого воркера, а затем использовать генератор, который возвращает только каждый n шаг. Мне это показалось бы излишеством. - person cfi; 31.10.2012

Я думаю, вы хотите использовать multiprocessing.Pool.imap для обработки ваших рабочих и распределения их рабочих мест. Я считаю, что он делает все, что вы хотите.

jobs = (some generator)                   # can consume jobs from a generator
pool = multiprocessing.Pool(3)            # set number of workers here
results = pool.imap(process_job, jobs)    # returns a generator

for r in results:                         # loop will block until results arrive
    do_something(r)

Если порядок результатов не имеет значения для вашего приложения, вы также можете использовать imap_unordered.

person Blckknght    schedule 31.10.2012
comment
Хм. Думаю, если я выполню диспетчеризацию заданий на Python, это подойдет. На самом деле это не ответ на вопрос, но вполне может решить мою общую проблему. Я еще не планировал кодировать фактическую отправку заданий в том же скрипте Python и, вероятно, не хочу этого делать. Система в целом более сложная, и фактическая диспетчеризация заданий в настоящее время происходит в сценариях оболочки, где вся конфигурация уже доступна. Я просто пытался взломать десять строк в Python, чтобы решить проблему заказа/распределения. Возможно, это все еще работает, если я позволю def process_job просто создавать/печатать команды оболочки. - person cfi; 31.10.2012
comment
@cfi: Хм, я не уверен, что понимаю. Если вы не хотите полностью использовать генератор jobs с самого начала (как вы это делаете в своем коде в вопросе), вам нужно будет позволить Python иметь некоторый контроль над синхронизацией между рабочими и поставкой заданий (что-то как минимум multiprocessing.Queue). Однако, если вы идете по этому пути, я бы позволил модулю multiprocessing обрабатывать как можно больше, а не изобретать свой класс Pool самостоятельно. Но, возможно, если вы расскажете больше об архитектуре вашей системы, мы сможем придумать что-то еще? - person Blckknght; 31.10.2012

Хорошо, после того, как сказал, что это невозможно, вот идея. Может быть, это то, что я должен перенести в codereview - мне очень интересны комментарии о том, сколько накладных расходов это влечет за собой в памяти. Другими словами, я не знаю, действительно ли это решает проблему, когда список задач очень длинный и неизвестного размера. Поскольку Blckknght упомянул, что multiprocessing может быть лучшей альтернативой.

Код:

import itertools

def distribute_work(workers, tasks):
    """Return one generator per worker with a fair share of tasks

    Task may be an arbitrary length generator.
    Workers should be an iterable.
    """
    worker_count = len(workers)
    worker_ids = range(worker_count)
    all_tasks_for_all_workers = itertools.tee(tasks, worker_count)
    assignments = [ (workers[id], itertools.islice(i, id, None, worker_count))
                    for (id,i) in enumerate(all_tasks_for_all_workers) ]    
    return(assignments)

Алгоритм заключается в том, чтобы

  1. Дублируйте исходный список задач по одному разу для каждого работника. Поскольку это только дублирование объектов генератора, он не должен зависеть от размера списка задач в памяти. Даже если это относительно дорогая операция, это единовременная стоимость запуска и незначительна в памяти для очень больших списков задач.
  2. Чтобы назначить задачи одному работнику, каждый работник должен взять часть списка задач. Если #W - это количество рабочих, первый рабочий берет задачи 0, #W, 2*#W, 3*#W и т. д. Вторые рабочие берут 0+1, #W+1, 2*#W+1, 3*#W+1 и т. д. Сращивание для каждого рабочего может быть выполнено с помощью itertools.islice

Для чистого разделения/назначения задач имена рабочих для этой функции на самом деле не требуются. Но количество рабочих есть. Изменение этого сделало бы функцию более универсальной и полезной, а также облегчило бы понимание возвращаемого значения. Чтобы ответить на мой собственный вопрос, я оставлю функцию как есть.

Использование и результат:

>>> for (worker,tasks) in distribute_work(['peter', 'paul', 'mary'], range(5)):
...   print(worker, list(tasks))
... 
peter [0, 3]
paul [1, 4]
mary [2]

И он также обрабатывает случаи, когда воркеры имеют одинаковые имена, но являются разными сущностями:

>>> for (worker,tasks) in distribute_work(['p', 'p', 'mary'], range(5)): 
...   print(worker, list(tasks))
... 
p [0, 3]
p [1, 4]
mary [2]
person cfi    schedule 31.10.2012

Вот подход, который мне нравится:

parallelism = os.cpu_count()
num_todos = len(todos)

# this zip fanciness makes each chunk stripe through the data sequentially overall so that the
# first items still get done first across all the workers
chunksize = math.ceil(num_todos / parallelism)
chunks = list(itertools.zip_longest(*[todos[i:i+chunksize] for i in range(0, num_todos, chunksize)]))
chunks = [[c for c in chunk if c is not None] for chunk in chunks]

with Pool(processes=parallelism) as pool:
    tasks = [pool.apply_async(my_function, args=(chunk)) for chunk in chunks]
    [task.get() for task in tasks]

В зависимости от того, нужно ли вам накапливать результат, вы можете настроить его, но для меня интересной частью является то, что работники сотрудничают для выполнения задач в глобальном порядке (в моем случае обработка последовательных кадров изображений, чтобы я мог видеть, как все выглядит, как все процессоры работают).

person Karl Rosaen    schedule 12.10.2017