Иди — почему для планирования фоновых рабочих goroutine также требуется собственная goroutine?

Я работаю над подбором нескольких шаблонов параллелизма Go. Я посмотрел на реализацию фоновых рабочих с использованием горутин и каналов ввода/вывода и заметил, что когда я отправляю новые задания на принимающий канал (по сути, ставлю новые задания в очередь), я должен делать это в горутине, иначе планирование испортится. Значение:

Это вылетает:

for _, jobData := range(dataSet) {
    input <- jobData
}

Это работает:

go func() {
    for _, jobData := range(dataSet) {
        input <- jobData
    }
}()

Для чего-то более конкретного я поэкспериментировал с каким-то бессмысленным кодом (вот он на игровой площадке):

package main

import (
    "log"
    "runtime"
)

func doWork(data int) (result int) {
    // ... some 'heavy' computation
    result = data * data
    return
}

// do the processing of the input and return
// results on the output channel
func Worker(input, output chan int) {
    for data := range input {
        output <- doWork(data)
    }
}

func ScheduleWorkers() {

    input, output := make(chan int), make(chan int)

    for i := 0 ; i < runtime.NumCPU() ; i++ {
        go Worker(input, output)
    }

    numJobs := 20

    // THIS DOESN'T WORK
    // and crashes the program
    /*
    for i := 0 ; i < numJobs ; i++ {
        input <- i
    }
    */

    // THIS DOES
    go func() {
        for i := 0 ; i < numJobs ; i++ {
            input <- i
        }
    }()

    results := []int{}
    for i := 0 ; i < numJobs ; i++ {
        // read off results
        result := <-output
        results = append(results, result)
        // do stuff...
    }

    log.Printf("Result: %#v\n", results)
}

func main() {
    ScheduleWorkers()
}

Я пытаюсь понять эту тонкую разницу - помощь приветствуется. Спасибо.


person sa125    schedule 24.03.2014    source источник


Ответы (3)


Ваша функция ScheduleWorks отправляет в основную горутину (то есть ту, которая запускает функцию main(), в которой запускается программа) значение через input. Worker получает его и отправляет другое значение через output. Но в этот момент никто не получает от output, поэтому программа не может продолжать работу, и главная горутина отправляет следующее значение другому Worker.

Повторите это рассуждение для каждого работника. У вас есть runtime.NumCPU() рабочих, что, вероятно, меньше, чем numJobs. Предположим, что runtime.NumCPU() == 4, значит, у вас 4 рабочих. В итоге вы успешно отправили 4 значения, каждое по одному Worker. Так как никто не читает из output, все рабочие заняты отправкой, поэтому они не могут принять больше данных через input, поэтому пятый input <- i зависнет. В этот момент каждая горутина ждет; это тупик.

введите здесь описание изображения

Вы заметите, что если вместо runtime.NumCPU() запустить 20 и более воркеров, программа работает. Это потому, что основная горутина может отправлять все, что хочет, через input, так как для их получения достаточно воркеров.

Если вместо всего этого вы поместите цикл input <- i в другую горутину, как в вашем успешном примере, горутина main (в которой работает ScheduleWorks) может продолжаться и начинать чтение с output. Таким образом, каждый раз, когда эта новая горутина отправляет значение, рабочий процесс отправляет другое через output, основная горутина получает этот вывод, а рабочий процесс может получить другое значение. Никто не ждет, и программа работает.

введите здесь описание изображения

person Toni Cárdenas    schedule 24.03.2014
comment
Вау, отличный ответ - это действительно проясняет ситуацию. Спасибо! - person sa125; 25.03.2014

Обратите внимание, что для таких задач sync.WaitGroup может быть альтернативным способом выполнения. То есть, если вам требуется, чтобы все данные были обработаны, прежде чем вы захотите продолжить.

Прочтите об этом в документации пакета синхронизации: http://golang.org/pkg/sync#WaitGroup

person tike    schedule 24.03.2014

Это потому, что все в Go блокируется по умолчанию.

Когда вы отправляете первое значение по небуферизованному каналу, оно блокируется до тех пор, пока получатель не удалит это значение из канала.

Каналы можно буферизовать, добавляя «емкость».

Например:

make(chan int, 20) // Make a buffered channel of int with capacity 20

Из спецификации Go:

Емкость в количестве элементов устанавливает размер буфера в канале. Если емкость больше нуля, канал является асинхронным: операции связи выполняются без блокировки, если буфер не заполнен (отправляет) или не пуст (получает), а элементы принимаются в том порядке, в котором они отправляются. Если пропускная способность равна нулю или отсутствует, связь удается только тогда, когда и отправитель, и получатель готовы.

Вы можете заставить свою исходную функцию работать, используя буферизованные каналы вместо небуферизованных каналов, но обертывание вызова функции в горутину, вероятно, является лучшим подходом, поскольку на самом деле он является параллельным.

Из Effective Go (Прочитайте этот документ полностью! Это, вероятно, документ с наибольшим количеством ссылок в ответах Go на Stack Переполнение):

Получатели всегда блокируются до тех пор, пока не появятся данные для приема. Если канал не буферизован, отправитель блокируется до тех пор, пока получатель не получит значение. Если канал имеет буфер, отправитель блокируется только до тех пор, пока значение не будет скопировано в буфер; если буфер заполнен, это означает ожидание, пока какой-либо получатель не получит значение.

Если вы используете буферизованные каналы, то вы просто заполняете канал, двигаетесь дальше, а затем снова его истощаете. Не одновременно.

Пример:

Сдача

input, output := make(chan int), make(chan int)

To

input, output := make(chan int, 20), make(chan int, 20)

Игровая площадка

person Intermernet    schedule 24.03.2014