Голанг: модель параллелизма производителя / потребителя, но с сериализованными результатами

func main() {
    jobs := []Job{job1, job2, job3}
    numOfJobs := len(jobs)
    resultsChan := make(chan *Result, numOfJobs)
    jobChan := make(chan *job, numOfJobs)
    go consume(numOfJobs, jobChan, resultsChan)
    for i := 0; i < numOfJobs; i++ {
        jobChan <- jobs[i]
    }
    close(jobChan)

    for i := 0; i < numOfJobs; i++ {
        <-resultsChan
    }
    close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
    for i := 0; i < num; i++ {
        go func() {
            job := <-jobChan
            resultsChan <- doJob(job)
        }()
    }
}

В приведенном выше примере задания помещаются в jobChan, а горутины извлекают его из jobChan, выполняют задания одновременно и помещают результаты в resultsChan. Затем мы извлечем результаты из resultsChan.

Вопрос 1:

В моем коде нет сериализованных / линеаризованных результатов. Хотя задания идут в порядке job1, job2, job3. Результаты могут быть такими: job3, job1, job2, в зависимости от того, какой из них занимает больше всего времени.

Я по-прежнему хотел бы выполнять задания одновременно, однако мне нужно убедиться, что результаты выводятся из resultsChan в том же порядке, в котором он входил в качестве заданий.

Вопрос2:

У меня примерно 300 тыс. Заданий, это означает, что код сгенерирует до 300 тыс. Горутин. Эффективно ли иметь так много горутин, или мне лучше сгруппировать задания в кусочки из 100 или около того, чтобы каждая горутина проходила через 100, а не через 1.


person samol    schedule 07.01.2014    source источник
comment
это может быть дубликат, проверьте эту ссылку: stackoverflow.com/questions/20744619/   -  person ymg    schedule 07.01.2014
comment
Привет, Ясир, спасибо тебе за это. Однако я не думаю, что это дубликат. Потому что вопрос, на который вы указали, похоже, не касается линеаризованных / сериализованных результатов.   -  person samol    schedule 07.01.2014


Ответы (1)


Вот способ, которым я справился с сериализацией (а также установил ограниченное количество рабочих). Я устанавливаю некоторые рабочие объекты с полями ввода и вывода и каналами синхронизации, затем просматриваю их по кругу, выбирая любую работу, которую они сделали, и поручаю им новую работу. Затем я делаю последний проход через них, чтобы выбрать все завершенные работы, которые остались. Обратите внимание, что вы можете захотеть, чтобы количество рабочих процессов несколько превышало количество ваших ядер, чтобы вы могли некоторое время держать все ресурсы занятыми, даже если есть одно необычно долгое задание. Код находится по адресу http://play.golang.org/p/PM9y4ieMxw и ниже.

Это непросто (более волосатое, чем я помню, прежде чем сесть писать пример!) - хотелось бы увидеть, что есть у кого-то еще, либо просто лучшие реализации, либо совершенно другой способ достижения вашей цели.

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "time"
)

type Worker struct {
    in     int
    out    int
    inited bool

    jobReady chan bool
    done     chan bool
}

func (w *Worker) work() {
    time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
    w.out = w.in + 1000
}
func (w *Worker) listen() {
    for <-w.jobReady {
        w.work()
        w.done <- true
    }
}
func doSerialJobs(in chan int, out chan int) {
    concurrency := 23
    workers := make([]Worker, concurrency)
    i := 0
    // feed in and get out items
    for workItem := range in {
        w := &workers[i%
            concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        } else {
            w.jobReady = make(chan bool)
            w.done = make(chan bool)
            w.inited = true
            go w.listen()
        }
        w.in = workItem
        w.jobReady <- true
        i++
    }
    // get out any job results left over after we ran out of input
    for n := 0; n < concurrency; n++ {
        w := &workers[i%concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        }
        close(w.jobReady)
        i++
    }
    close(out)
}
func main() {
    runtime.GOMAXPROCS(10)
    in, out := make(chan int), make(chan int)
    allFinished := make(chan bool)
    go doSerialJobs(in, out)
    go func() {
        for result := range out {
            fmt.Println(result)
        }
        allFinished <- true
    }()
    for i := 0; i < 100; i++ {
        in <- i
    }
    close(in)
    <-allFinished
}

Обратите внимание, что только in и out в этом примере несут фактические данные - все остальные каналы предназначены только для синхронизации.

person twotwotwo    schedule 07.01.2014