Несколько вызовов Parallel.ForEach, MemoryBarrier?

У меня есть куча строк данных, и я хочу использовать Parallel.ForEach для вычисления некоторого значения в каждой строке, как это...

class DataRow
{
    public double A { get; internal set; }
    public double B { get; internal set; }
    public double C { get; internal set; }

    public DataRow()
    {
        A = double.NaN;
        B = double.NaN;
        C = double.NaN;
    }
}

class Program
{
    static void ParallelForEachToyExample()
    {
        var rnd = new Random();
        var df = new List<DataRow>();

        for (int i = 0; i < 10000000; i++)
        {
            var dr = new DataRow {A = rnd.NextDouble()};
            df.Add(dr);
        }

        // Ever Needed? (I)
        //Thread.MemoryBarrier();

        // Parallel For Each (II)
        Parallel.ForEach(df, dr =>
        {
            dr.B = 2.0*dr.A;
        });

        // Ever Needed? (III)
        //Thread.MemoryBarrier();

        // Parallel For Each 2 (IV)
        Parallel.ForEach(df, dr =>
        {
            dr.C = 2.0 * dr.B;
        });
    }
}

(В этом примере нет необходимости в распараллеливании, и если бы он был, все это могло бы войти в один Parallel.ForEach. Но это должна быть упрощенная версия некоторого кода, где имеет смысл настроить его таким образом).

Можно ли здесь переупорядочить чтения, чтобы в итоге я получил строку данных, где B != 2A или C != 2B?

Скажем, первый Parallel.ForEach (II) назначает рабочий поток 42 для работы со строкой данных 0. А второй Parallel.ForEach (IV) назначает рабочий поток 43 для работы со строкой данных 0 (как только первый Parallel.ForEach завершится) . Есть ли шанс, что чтение dr.B для строки 0 в потоке 43 вернет double.NaN, поскольку он еще не видел записи из потока 42?

И если да, то вставка барьера памяти на III вообще помогает? Заставит ли это обновления из первого Parallel.ForEach быть видимыми для всех потоков до запуска второго Parallel.ForEach?


person Michael Covelli    schedule 15.05.2015    source источник
comment
Короче говоря.. Я не думаю, что вам нужны явные барьеры памяти.. Обоснованно предположить, что реализация Parallel.ForEach имеет какую-то синхронизацию для завершения цикла/перед возвратом вызова ForEach   -  person Vikas Gupta    schedule 15.05.2015
comment
Имея лучшее представление о вашем фактическом коде, я мог бы дать вам лучший ответ, кроме «Нет, не беспокойтесь об этом». :)   -  person jdphenix    schedule 15.05.2015
comment
Может быть, причина разделения станет немного понятнее, если я скажу, что расчет в каждой строке второго параллельного цикла (IV) зависит от некоторого значения, которое можно узнать только после завершения первого цикла (II). Скажем, нам нужна медиана значений dr.B по всем строкам, прежде чем мы сможем вычислить значение dr.C для каждой строки.   -  person Michael Covelli    schedule 15.05.2015


Ответы (2)


Работа, начатая Parallel.ForEach(), будет выполнена до его возвращения. Внутри ForEach() порождает Task для каждой итерации и вызывает Wait() на каждой. В результате вам не нужно синхронизировать доступ между ForEach() вызовами.

Вы делаете это необходимо помнить для отдельных задач с ForEach() перегрузками, которые позволяют вам получить доступ к состоянию цикла, агрегированию результатов задач и т. д. Например, в этом тривиальном примере, который суммирует 1 ≤ x ≤ 100, переданное Action localFinally из Parallel.For() должен быть обеспокоен проблемами синхронизации,

var total = 0;

Parallel.For(0, 101, () => 0,  // <-- localInit
(i, state, localTotal) => { // <-- body
  localTotal += i;
  return localTotal;
}, localTotal => { <-- localFinally
  Interlocked.Add(ref total, localTotal); // Note the use of an `Interlocked` static method
});

// Work of previous `For()` call is guaranteed to be done here

Console.WriteLine(total);

В вашем примере нет необходимости вставлять барьер памяти между вызовами ForEach(). В частности, цикл IV может зависеть от результатов завершения II и уже вставленного III Parallel.ForEach() для вас.

Фрагмент получен из: Parallel Framework и предотвращение ложного совместного использования

person jdphenix    schedule 15.05.2015
comment
Спасибо. Когда я просматриваю код Parallel.ForEach на несколько уровней ниже, мне кажется, что большую часть работы выполняет private static ParallelLoopResult ForWorker‹TLocal›. Мне немного трудно понять, но похоже, что есть вызов rootTask.Wait(); который ожидает завершения всех рабочих потоков, прежде чем продолжить. Но даже несмотря на то, что мой основной поток ожидает завершения рабочих процессов, это не гарантирует, что рабочие потоки, распределенные среди всех других процессоров, обязательно увидят самые последние записи, когда они переходят к считыванию значений, не так ли? - person Michael Covelli; 15.05.2015
comment
Это правильно, и я отредактирую свой ответ, чтобы, возможно, он был немного более ясным. Задачи, порожденные одним и тем же вызовом ForEach(), должны учитывать проблемы с параллелизмом — обычно это место, о котором нужно беспокоиться, находится в Action, которое вы передаете localFinally. Однако различные вызовы ForEach() могут безопасно зависеть от результатов предыдущих вызовов ForEach(). - person jdphenix; 15.05.2015
comment
Я предполагаю, что мой вопрос связан с этим... генераторы барьеров памяти. Я просто хочу убедиться, что конец Parallel.ForEach попадает в одну из этих корзин. Чтобы он имел свой MemoryBarrier (эффективно) и гарантировал, что все будет полностью записано до запуска следующего Parallel.ForEach. - person Michael Covelli; 15.05.2015
comment
Связано да - это скорее то, что создает барьер памяти, а ваш - мне нужен он между Parallel.ForEach() вызовами. - person jdphenix; 15.05.2015
comment
Думаю, если это правильно... albahari.com/threading/part4.aspx затем Все, что полагается на сигнализацию, например, запуск или ожидание задачи, неявно создает полную ограду. И последнее, что в теле функции private static ParallelLoopResult ForWorker‹TLocal›, выполняющей большую часть работы в Parallel.ForEach, — это вызов rootTask.Wait(); (перед блоками catch и finally). Таким образом, похоже, что этот вызов создает такое же полное ограждение, как и MemoryBarrier. Так что это не нужно. - person Michael Covelli; 15.05.2015
comment
albahari.com/threading/part5.aspx#_The_Parallel_Class может быть более интересное чтение, учитывая тему. - person jdphenix; 15.05.2015
comment
Но как насчет MemoryBarrier в I. Что такого в начале Parallel.ForEach, из-за которого барьер памяти уже существует? - person Michael Covelli; 15.05.2015

Поскольку несколько потоков будут обращаться к одной и той же переменной "dr.B", вам необходимо убедиться, что ваш код C# является потокобезопасным.

Попробуйте использовать блокировку вокруг каждой операции https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

e.g.

private Object thisLock1 = new Object();
...
lock(thisLock1)
{
    dr.C = 2.0 * dr.B;
}

...
lock(thisLock1)
{
    dr.B = 2.0*dr.A;
}

Однако это приведет к нарушению параллельной обработки. так как каждый поток должен ждать, пока следующий не будет выполнен.

Обязательно ознакомьтесь с потенциальной ловушкой параллельной обработки: https://msdn.microsoft.com/en-us/library/dd997403%28v=vs.110%29.aspx

person Carl Prothman    schedule 15.05.2015
comment
В конкретном примере OP с использованием Parallel.ForEach() каждый вызов ForEach() уже обрабатывает синхронизацию, в частности, обеспечивая завершение любых параллельных операций, порожденных вызовом, до его возврата. - person jdphenix; 15.05.2015
comment
@jdphenix - не могли бы вы дать ссылку, пожалуйста (для моего образования)? Примечание Microsoft MSDN показывает: Как написать цикл Parallel.ForEach с локальными переменными потока msdn.microsoft.com/en-us/library/dd460703%28v=vs.110%29.aspx, который использует (finalResult) =› Interlocked.Add(ref total , конечный результат) - person Carl Prothman; 15.05.2015
comment
Правильно заявить, что отдельный ForEach() должен учитывать безопасность потоков, и поэтому ForEach() предоставляет перегрузки, которые позволяют указать локальный поток и финализатор, как вы связали. Что касается внутренних вызовов Wait() для ForEach(), мне пришлось просмотреть справочный источник, чтобы подтвердить это. - person jdphenix; 15.05.2015
comment
Я мог бы заблокировать каждую строку данных. Я согласен с тем, что в 99% случаев это правильно, чтобы избежать рассуждений о низком коде блокировки. Но здесь у меня есть огромное количество строк данных, которые хорошо поддаются параллельной обработке. Добавление блокировки значительно замедляет мой реальный код. И мне действительно не нужно гарантировать взаимное исключение здесь. Каждый рабочий поток будет работать только с одной строкой данных за раз. Что мне действительно нужно сделать, так это убедиться, что если thead A работает в строке i в первом цикле и в потоке B во втором, то B может видеть все записи A, прежде чем продолжить. - person Michael Covelli; 15.05.2015