Управление параллельным потоком foreach

Мне нужно запустить две задачи с помощью Parallel.ForEach, но я хочу, чтобы первая завершилась, а затем затем запустилась вторая.

Parallel.Foreach(items, (item, state) =>
{
    Task1(item);
    Task2(item);
}
public void Task1(Item item)
{
    var records = GetRecordsToExport();
    UpdateFields1(records);
}
public void Task2(Item item)
{
    var content = File.Read(...);
    // read every line in content, and for every line update field DB
    foreach(var c in content)
    {
        UpdateFields2(c); // different methods
    }
}

Это код, я не могу выложить больше, так как код довольно большой

Task1 проверяет наличие некоторых записей в БД, которые нуждаются в обновлении, а Task2 проверяет наличие последних обновленных записей (из Task1) и обновляет их снова.

Изначально у меня были задачи наоборот, и я не мог понять, почему программа вылетает. Это связано с тем, что одна задача началась до завершения другой, и они работают с одними и теми же таблицами БД. Как я могу контролировать порядок запуска?


person raresm    schedule 14.11.2014    source источник
comment
То, что вы говорите, не имеет смысла. Task1 и Task2 не будут выполняться одновременно для одного и того же item. Как вы написали, Task2 всегда будет запускаться после Task1, как и следовало ожидать. Наиболее вероятная проблема заключается в том, что код внутри Task1 и Task2 не является потокобезопасным. Используя Parallel.ForEach, вы будете иметь много Task1 и Task2, работающих одновременно, но каждый из них будет работать для разных item. Вы должны показать нам код для Task1 и Task2.   -  person J...    schedule 14.11.2014
comment
Ну что вы говорите неправду. Task1 проверяет наличие некоторых записей в БД, которые необходимо обновить, а Task2 проверяет наличие последних обновленных записей и снова их обновляет. Проблема возникла, когда и задача 1, и задача 2 работали с одной и той же записью, и это произошло, когда задача 1 была почти завершена, а задача 2 запущена. Мне нужно, чтобы несколько задач выполнялись в фоновом режиме, например, все задачи1, а затем все задачи2... но без foreach для каждой задачи.   -  person raresm    schedule 14.11.2014
comment
Вы создаете новую тему для Task1 и Task2 для внутреннего использования? Пожалуйста, добавьте код для них обоих.   -  person Yuval Itzchakov    schedule 14.11.2014
comment
Task2 обновляет последнюю обновленную запись? но вы хотите обновить несколько записей одновременно?   -  person James Barrass    schedule 14.11.2014
comment
Мне нужно запустить две задачи, используя Parallel.ForEach, но я хочу, чтобы первая завершилась, а затем запустила вторую. => Использовать Task1.ContinueWith(Task2)?   -  person a''    schedule 14.11.2014
comment
Итак, вы хотите что-то вроде Parallel.Foreach(items, (item, state) => { Task1(item); } Parallel.Foreach(items, (item, state) => { Task2(item); }   -  person Fabian H.    schedule 14.11.2014
comment
Task2 не обязательно обновляет последнюю обновленную запись, но это может произойти. Задача 1 — это процесс экспорта, а задача 2 — процесс импорта. Порядок сначала экспорт, а потом импорт. Я хочу, чтобы экспорт был завершен, чтобы и только после этого можно было начать импорт, так как нет смысла импортировать данные, если экспорт не завершен. Я хочу то же самое с двумя foreachs, но я не хочу писать такой код. Для каждого товара мне нужен такой порядок: экспорт->импорт. Мне не нужно, чтобы весь экспорт (задача 1) был завершен, только для каждого элемента мне нужно экспортировать перед импортом (задача 1 перед задачей 2).   -  person raresm    schedule 14.11.2014


Ответы (2)


Что вы можете сделать, так это заблокировать доступ к Task2, пока в Task1 выполняется какой-либо поток. И наоборот, ни одному потоку не разрешено входить в Task1, когда какая-либо Task2 все еще выполняется.

Я думаю, вы можете попробовать использовать ReaderWriterLockSlim:

var lock = new ReaderWriterLockSlim();


Parallel.Foreach(items, (item, state) =>
{
    lock.EnterReadLock();
    try {
        Task1(item);
    } finally {
        lock.ExitReadLock()
    }
    lock.EnterWriteLock();
    try {
        Task2(item);
    } finally {
        lock.ExitWriteLock()
    }
}

Этот код также предотвратит двойной ввод любых методов Task2. Если вам это нужно, вы можете попробовать использовать Semaphore или Mutex. Но это усложнит задачу.

person GvS    schedule 14.11.2014
comment
Делая такие вещи, кажется, что параллельный цикл не будет иметь никакого значения. Если вы будете удерживать другие потоки до тех пор, пока задача не будет выполнена, параллельного выполнения не будет. Или я что-то упускаю? - person Juliano Nunes Silva Oliveira; 14.11.2014
comment
Будут ли они работать КАК экспорт1, экспорт2, ..., экспортN, а затем импорт1, импорт2,..., импортN ИЛИ КАК экспорт1, импорт1, ..., экспортN, импортN? Потому что мне нужен второй. Если второй, то это тот код, который мне был нужен. (экспорт — это задача 1, а импорт — задача 2) - person raresm; 14.11.2014
comment
Task1 по-прежнему будет выполняться параллельно. И похоже у него проблема с одновременным выполнением Task1 и Task2. - person GvS; 14.11.2014
comment
@raresm: Это будет первый случай, сначала весь importN (не по порядку, выполняется параллельно), а затем ExportN (не по порядку, выполняется не параллельно). - person GvS; 14.11.2014
comment
Я предполагаю, что вы имели в виду экспорт N, а затем импорт N, поскольку экспорт - это задача 1, а импорт - задача 2. Это был ответ, который мне нужен, спасибо! - person raresm; 14.11.2014

Задача Task2 не должна иметь возможности работать с элементом, который Task1 еще не завершила обработку:

public class Test
{
    private static object syncObj = new object();

    public static void Main()
    {
        var items = Enumerable.Range(0, 100).ToArray();
        Parallel.ForEach(items, (item, state) =>
        {
            Task2(Task1(item));
        });
    }

    static int Task1(int item)
    {
        Console.WriteLine("Task 1 : {0}", item);
        Thread.Sleep(100);
        return item;
    }

    static int Task2(int item)
    {
        Console.WriteLine("Task 2 : {0}", item);
        return item;
    }
}   
person ilitirit    schedule 14.11.2014