проблема golang с каналами (с буферизацией и без буферизации)

Я работаю над личным проектом, который будет работать на Raspberry Pi с подключенными к нему датчиками.

Он состоит из 2-х программ:

  1. сервер, который считывает данные с датчиков каждые X секунд
  2. клиент, который сохраняет данные в базе данных sqlite и может отправлять некоторые команды

Сервер может:

  • считывать данные с датчиков, записывать их в сокет, чтобы клиент мог сохранить их в БД
  • слушать сокет, чтобы, когда клиент отправляет какую-либо команду, он может ее выполнить и отправить ответ обратно клиенту

Функция, считывающая с датчиков, и функция, которая обрабатывает соединение с сокетом, выполняются в разных горутиках, поэтому, чтобы отправлять данные в сокет, когда они считываются с датчиков, я создаю байтовый канал [] в основной функции , передав его в горутины.

Когда данные собираются с датчиков (если есть подключенный клиент), они записываются в канал, поэтому другая функция записывает их в сокет, и клиент получает их.

Моя проблема возникла здесь: если я делаю несколько записей подряд, только первые данные поступают к клиенту, а другие - нет. Но если я добавлю немного time.sleep в функцию, которая записывает в канал, все данные будут правильно поступать к клиенту.

Во всяком случае, это упрощенная версия этой маленькой программы:

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create to the socket and launch the accept client routine
func launchServerUDS(ch chan []byte) {
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    go acceptConnectionRoutine(l, ch)
}

// accept incoming connection on the socket and
// 1) launch the routine to handle commands from the client
// 2) launch the routine to send data when the server reads from the sensors
func acceptConnectionRoutine(l net.Listener, ch chan []byte) {
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        go commandsHandlerRoutine(conn, ch)
        go autoSendRoutine(conn, ch)

    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle client connection and calls functions to execute commands
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            ch <- []byte("exit")
            break
        }
    }
}

// write on the channel to the autosend routine so the data are written on the socket
func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if i put a little sleep here, no problems
        // i i remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
    }
}

func dummyReadDataRoutine(ch chan []byte) {
    for {
        // read data from the sensors every 5 seconds
        time.Sleep(5 * time.Second)
        // read first data and send it
        sendDataToClient([]byte("dummy data1\n"), ch)
        // read second data and send it
        sendDataToClient([]byte("dummy data2\n"), ch)
        // read third data and send it
        sendDataToClient([]byte("dummy data3\n"), ch)
    }
}

func main() {
    ch := make(chan []byte)
    wg := sync.WaitGroup{}
    wg.Add(2)
    go dummyReadDataRoutine(ch)
    go launchServerUDS(ch)
    wg.Wait()
}

Я думаю, что мне что-то не хватает, и я не хочу использовать сон, чтобы писать, потому что я не думаю, что это правильный способ делать это. Есть ли какая-то ошибка или лучший способ сделать это? Единственное, что должно остаться, как и я, - это то, что функции обработки сокетов и чтения данных должны выполняться в разных горутинах.


person L30    schedule 17.05.2020    source источник
comment
Скорее всего, вы не нашли ничего в поиске, потому что у вас есть куча несвязанных проблем. Всегда старайтесь упростить насколько это возможно, чтобы создать свой минимальный воспроизводимый пример, потому что ни один из этого имеет какое-либо отношение к сокетам unix. У вас есть горутины, вызывающие горутины, вызывающие горутины без причины. Ваш WaitGroup ничего не делает, потому что ни одна горутина не вызывает Done() (и они не могли, потому что это не входит в их область действия). Отправка данных по каналу без необходимости находится в отдельной функции с default регистром в вашем выборе без видимой причины.   -  person JimB    schedule 17.05.2020
comment
Канал не буферизован, и вы пишете в канал с выбором и значением по умолчанию, которое ничего не делает. Чтобы исправить ваш код, вы должны добавить буферизацию в канал и удалить select. Затем просто напишите данные в канал.   -  person chmike    schedule 17.05.2020
comment
@JimB привет, спасибо за предложения, этот пример упрощен для меня, потому что моя программа больше, чем эта, и я добавил здесь группу ожидания только для запуска подпрограмм, поэтому я не ставил wg.Done () для этого причина, в любом случае, большое спасибо за ваши предложения   -  person L30    schedule 17.05.2020
comment
@chmike, я попробую ваше решение и отредактирую свой вопрос, спасибо вам тоже   -  person L30    schedule 17.05.2020
comment
@Leonardo, оставив такой бессмысленный код, мы не сможем отличить то, что осталось незавершенным, и то, чего вы не понимаете. Случай default - это то, что потеряло бы отправку данных, но это, очевидно, неверно (точно так же, как дополнительные горутины или неиспользуемая группа ожидания, очевидно, неверны), поэтому нам нужен контекст, чтобы понять, почему он там вообще.   -  person JimB    schedule 17.05.2020
comment
@JimB, я понял это, и только что сказал ранее, спасибо за ваше терпение и вашу помощь. Я старался изо всех сил, и я думал, что моя проблема связана с сокетами программы, но это не так. Как сказал chmike, моя проблема была в небуферизованном канале. Я просто попробовал его решение, и оно отлично работает. Теперь я отредактирую свой вопрос, пытаясь написать лучший код, еще раз спасибо   -  person L30    schedule 17.05.2020
comment
@JimB, если вы думаете, что мой новый код неясен, дайте мне знать, чтобы я мог его отредактировать и улучшить свой вопрос, в любом случае спасибо   -  person L30    schedule 17.05.2020
comment
@Leonardo, новый код намного понятнее, хотя есть и другие, пока не связанные с этим вопросы. Вы неправильно обрабатываете Read, см. io.Reader. Также сомнительно, что нет кадрирования сообщений, поскольку сокеты unix представляют собой поток, вы можете получать несколько или частичные сообщения при каждом чтении или записи. Вы также обнаружите, что большинство серверов будут изо всех сил стараться избегать выделения нового буфера для каждого отдельного чтения, но это означает реструктуризацию вашей программы в целом (хотя это не вызывает особого беспокойства, если чтения выполняются нечасто).   -  person JimB    schedule 17.05.2020
comment
@JimB спасибо за советы :). Я переместил создание буфера за пределы for {}, поэтому теперь буфер создается только один раз. Я попробую и другое предложение   -  person L30    schedule 17.05.2020


Ответы (2)


Основная проблема заключалась в функции:

func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if I put a little sleep here, no problems
        // if I remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
}

Если канал ch не готов в момент вызова функции, будет использован регистр default, а data никогда не будет отправлен. В этом случае вам следует исключить функцию и отправить ее напрямую в канал.

Буферизация канала ортогональна рассматриваемой проблеме и должна выполняться по тем же причинам, что и буферизованный ввод-вывод, т.е. обеспечение «буфера» для записи, которая не может немедленно прогрессировать. Если код не мог работать без буфера, добавление одного только задерживает возможные взаимоблокировки.

Вам также не нужно здесь значение exit дозорного, так как вы можете перемещаться по каналу и закрывать его, когда закончите. Однако при этом по-прежнему игнорируются ошибки записи, но, опять же, это требует некоторой переделки.

for data := range ch {
    c.Write(data)
}

Вы также должны быть осторожны, передавая срезы по каналам, так как слишком легко потерять информацию о том, какой логический процесс имеет право собственности и будет изменять массив поддержки. Из представленной информации я не могу сказать, улучшает ли архитектура передачу данных чтения + записи по каналам, но это не шаблон, который вы найдете в большинстве сетевых кодов go.

person JimB    schedule 17.05.2020
comment
большое спасибо ! Я рад, что вы мне помогли, и я думаю, что пересмотрю свою архитектуру после этих чтений - person L30; 17.05.2020

JimB дал хорошее объяснение, поэтому я думаю, что его ответ лучше.

Я включил в этот ответ свое частичное решение.

Я думал, что мой код был ясным и упрощенным, но, как сказал Джим, я могу сделать его проще и яснее. Я оставляю свой старый код опубликованным, чтобы люди могли лучше понять, как можно опубликовать более простой код и не делать такой беспорядок, как я.

Как сказал chmike, моя проблема не была связана с сокетом, как я думал, а была связана только с каналом. Запись на небуферизованный канал была одной из проблем. После изменения небуферизованного канала на буферизованный проблема была решена. В любом случае, этот код не является «хорошим кодом» и может быть улучшен в соответствии с принципами, изложенными JimB в своем ответе.

Итак, вот новый код:

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create the socket and accept clients connections
func launchServerUDS(ch chan []byte, wg *sync.WaitGroup) {
    defer wg.Done()
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        // this goroutine are launched when a client is connected
        // routine that listen and echo commands
        go commandsHandlerRoutine(conn, ch)
        // routine to send data read from the sensors to the client
        go autoSendRoutine(conn, ch)
    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle commands received from the client
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            // if i can't read send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            // if i can't write back send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
    }
}

// this goroutine reads from the sensors and write to the channel , so data are sent
// to the client if a client is connected
func dummyReadDataRoutine(ch chan []byte, wg *sync.WaitGroup) {
    x := 0
    for x < 100 {
        // read data from the sensors every 5 seconds
        time.Sleep(1 * time.Second)
        // read first data and send it
        ch <- []byte("data1\n")
        // read second data and send it
        ch <- []byte("data2\n")
        // read third data and send it
        ch <- []byte("data3\n")
        x++
    }
    wg.Done()
}


func main() {
    // create a BUFFERED CHANNEL
    ch := make(chan []byte, 1)
    wg := sync.WaitGroup{}
    wg.Add(2)
    // launch the goruotines that handle the socket connections
    // and read data from the sensors
    go dummyReadDataRoutine(ch, &wg)
    go launchServerUDS(ch, &wg)
    wg.Wait()
}

person L30    schedule 17.05.2020
comment
Проблема заключалась не в небуферизованном канале, а в default в select, из-за которого вы отбрасывали попытки записи в канал, когда он не был сразу готов. - person JimB; 17.05.2020
comment
@halfer, надеюсь, теперь все в порядке, извините за этот беспорядок, ребята, спасибо за помощь - person L30; 18.05.2020