Python: существующее решение для разветвления рабочего процесса / конвейера?

В моем приложении я реализовал очень грубый рабочий процесс, состоящий из 5 различных «блоков обработки». Код на данный момент имеет такую ​​структуру:

def run(self, result_first_step=None, result_second_step=None):

    config = read_workflow_config("config.ini")

    if config.first_step:
        result_first_step = run_process_1()

    if config.second_step and result_first_step is not None:
        result_second_step = run_process_2(result_first_step)
    else:
        raise Exception("Missing required data")

    if config.third_step:
        result_third_step = run_process_3(result_first_step, result_second_step)
    else:
        result_third_step = None

    collect_results(result_first_step, result_second_step, result_third_step)

и так далее. Код работает, но его трудно поддерживать и он довольно хрупкий (обработка намного сложнее, чем в этом упрощенном примере). Итак, я думал о принятии другой стратегии, то есть о создании правильного рабочего процесса с:

  • Короткое замыкание: я не могу передать данные начальному процессу или два разных типа данных. В последнем случае рабочий процесс прерывается и пропускает некоторую обработку.
  • Общие объекты: подобная конфигурация доступна для всех юнитов.
  • Условия: в зависимости от конфигурации некоторые биты могут быть пропущены

Доступна ли библиотека Python для выполнения таких рабочих процессов, или мне следует использовать собственную? Я пробовал pyutilib.workflow, но он не поддерживает должным образом общий объект конфигурации, за исключением передачи его всем рабочим (утомительно).

Примечание: это приложение для библиотеки / командной строки, поэтому веб-решения для рабочих процессов не подходят.


person Einar    schedule 09.03.2012    source источник
comment
Вы пробовали гуглить этот вопрос? Что было не так в том, что вы обнаружили?   -  person Marcin    schedule 09.03.2012
comment
Судя по тому, как вы это написали, вы не сможете run_process_2, если еще не run_process_1. Это правда?   -  person Katriel    schedule 09.03.2012
comment
В самом деле, я скорректирую его, чтобы лучше показать то, что я задумал. РЕДАКТИРОВАТЬ: изменен пример, показывающий, как можно короткое замыкание.   -  person Einar    schedule 09.03.2012
comment
@Marcin Это не первый раз, когда я искал этот ответ в Google, и большинство решений либо чрезмерно спроектированы, основаны на веб-технологиях (нет, нет), либо не предоставляют то, что мне нужно.   -  person Einar    schedule 09.03.2012
comment
@Einar Было бы полезно, если бы вы объяснили, что не так с существующими решениями по отдельности.   -  person Marcin    schedule 09.03.2012
comment
stackoverflow .com / questions / 704834 /, это может дать некоторое представление.   -  person John    schedule 09.03.2012


Ответы (2)


Вы можете превратить метод run в генератор;

def run(self)
  result_first_step = run_process_1()
  yield result_first_step
  result_second_step = run_process_2(result_first_step)
  yield result_second_step
  result_third_step = run_process_3(result_first_step, result_second_step)
  collect_results(result_first_step, result_second_step, result_third_step)
person Roland Smith    schedule 09.03.2012

В Python существует целый ряд подходов к конвейерам, от полстраницы до ...
Основная идея: вверху поместить все определения шагов в dict;
затем конвейер (например, " CAT ") выполняет шаги C, A, T.

class Pipelinesimple:
    """p = Pipelinesimple( funcdict );  p.run( "C A T" ) = C(X) | A | T

    funcdict = dict( A = Afunc, B = Bfunc ... Z = Zfunc )
    pipeline = Pipelinesimple( funcdict )
    cat = pipeline.run( "C A T", X )  # C(X) | A | T, i.e. T( A( C(X) ))
    dog = pipeline.run( "D O G", X, **kw )  # D(X, **kw) | O(**kw) | G(**kw)
    """

def __init__( self, funcdict ):
    self.funcdict = funcdict  # funcs or functors of X

def run( self, steps, X, **commonargs ):
    """ steps "C A T" or ["C", "A", "T"]
        all funcs( X, **commonargs )
    """

    if isinstance( steps, basestring ):
        steps = steps.split()  # "C A T" -> ["C", "A", "T"]
    for step in steps:
        func = self.funcdict(step)
        # if X is None: ...
        X = func( X, **commonargs )
    return X

Далее, существует несколько способов присвоения разных параметров разным шагам.

Один из способов - разобрать многострочную строку, например

""" C  ca=5  cb=6 ...
    A  aa=1 ...
    T  ...
"""

Другой - взять список функций / имен функций / параметров, например

pipeline.run( ["C", dict(ca=5, cb=6), lambda ..., "T", dict(ta=3) ])

Третий - разделить параметры «A__aa B__ba ...» так, как sklearn.pipeline.Pipeline. делает. (Это связано с машинным обучением, но вы можете копировать части конвейера.)

У каждого из них есть довольно четкие плюсы и минусы.

Большое сообщество талантливых людей может очень быстро придумать дюжину прототипов решений проблемы [конвейеров].
Но сокращение дюжины до двух или трех занимает навсегда.

Какой бы способ вы ни выбрали, предоставьте способ регистрации всех параметров для запуска.

См. Также:
FilterPype
nipype

person denis    schedule 22.04.2012
comment
Интересный подход, я разветвлю свой код и попробую. - person Einar; 27.04.2012
comment
@Einar, какой из трех подходов ты выберешь? - person denis; 27.04.2012