Использование рабочих потоков для добавления новых задач в пул задач в D

Это упрощение и сужение к другому из моих вопросов: Нужна помощь в параллельном обходе даг в D

Скажем, у вас есть код, который вы хотите распараллелить. Проблема в том, что некоторые из вещей, которые вам нужно сделать, имеют предварительные условия. Поэтому вы должны убедиться, что эти предварительные условия выполнены, прежде чем добавлять новую задачу в пул. Простой концептуальный ответ состоит в том, чтобы добавлять новые задачи по мере выполнения их предварительных требований.

Здесь у меня есть небольшой фрагмент кода, который эмулирует этот шаблон. Проблема в том, что он генерирует исключение, потому что pool.finish() вызывается до того, как новая задача будет помещена в очередь рабочим потоком. Есть ли способ просто подождать, пока все потоки не будут простаивать или что-то в этом роде? Или есть другая конструкция, которая позволила бы использовать этот шаблон?

Обратите внимание: это упрощенная версия моего кода для иллюстрации проблемы. Я не могу просто использовать taskPool.parallel() в foreach.

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        pool.put( task!simpleWorker(depth,maxDepth,pool));
    }
}

void main(){
    auto pool = new TaskPool();
    auto t = task!simpleWorker(0,5,pool);
    pool.put(t);
    pool.finish(true);
    if (t.done()){ //rethrows the exception thrown by the thread.
        writeln("Done");
    }
}

person Tyler Fox    schedule 25.10.2013    source источник
comment
сделать t.workForce() перед вызовом pool.finish() в главном   -  person ratchet freak    schedule 25.10.2013
comment
Это не позволяет ему генерировать исключение (определенно, шаг в правильном направлении), но он проходит только через 2 итерации, когда должен пройти через 5.   -  person Tyler Fox    schedule 25.10.2013


Ответы (1)


Я исправил: http://dpaste.dzfl.pl/eb9e4cfc

Я изменил цикл for на:

void cleanNodeSimple(Node node, TaskPool pool){
    node.doProcess();
    foreach (cli; pool.parallel(node.clients,1)){ // using parallel to make it concurrent
        if (cli.canProcess()) {
            cleanNodeSimple(cli, pool); 
            // no explicit task creation (already handled by parallel)
        }
    }
}
person ratchet freak    schedule 25.10.2013
comment
При этом каждый поток блокируется до тех пор, пока не будут завершены все подпотоки. Это означает, что он рекурсивно создает новые потоки, не позволяя завершать старые. Таким образом, выполнение этого с большой глубиной вызовет своего рода переполнение. - person Tyler Fox; 25.10.2013
comment
нет, вызывающий поток также будет работать над каждым вызовом foreach (по крайней мере, так говорит мне мой анализ std.parallelism) - person ratchet freak; 25.10.2013
comment
Посмотрите эту версию кода: dpaste.dzfl.pl/667b9c5d Обратите внимание, что все узлы очищается до завершения любой из параллельных итераций foreach. Если я увеличу размер DAG до 500 на своей локальной машине, произойдет сбой. Однако, похоже, он успешно продолжается на сервере dpaste. - person Tyler Fox; 25.10.2013
comment
проверьте, какой поток выполняет очистку, скорее всего, это вызывающий поток в большинстве случаев - person ratchet freak; 26.10.2013