Как сохранить поток GRPC для повторного использования

У меня есть сервер GRPC, который предоставляет функцию и возвращает поток. Я хочу сохранить поток на map[string]grpc.Stream - пока это работает.

Моя проблема в том, что поток закрывается после того, как функция, возвращающая поток, завершает свою логику.

Это то, что у меня есть на данный момент:

func (s *server) CheckConnection(initMessage *LighterGRPC.InitMessage, stream LighterGRPC.Lighter_CheckConnectionServer) error {
    //Do something magic
    streams[initMessage.DeviceID] = stream

    error := stream.Send(&LighterGRPC.ColorMessage{DATA})
    if error {
        log.Println(error)
    }

    //Tried
    //for { }

    return error
}

Я уже пытался разрешить функции никогда ничего не возвращать с for {} перед возвратом (как прокомментировано в коде выше), но это не помогло, и я не думаю, что это могло быть решением.

Есть ли способ оставить поток открытым, чтобы я мог позже отправлять данные клиенту во время выполнения?


person JSF    schedule 28.04.2016    source источник
comment
Похоже на вариант использования каналов и горутин.   -  person evanmcdonnal    schedule 28.04.2016
comment
Но тогда у меня не было бы потока для отправки данных обратно клиенту, или я ошибаюсь?   -  person JSF    schedule 28.04.2016
comment
Я больше думал о том, чтобы запустить этот метод как горутину и использовать каналы, чтобы он работал, пока вы не отправите ему сигнал, указывающий на то, что пора вернуться.   -  person evanmcdonnal    schedule 28.04.2016
comment
Это не помогает, поток закрывается, даже если я позволю запускать функцию навсегда   -  person JSF    schedule 28.04.2016
comment
Вы проверили, закрывает ли клиент поток? Какая задержка между взаимодействиями в потоке? Возникает ли у клиента ошибка при чтении из потока (это автоматически закроет поток)? Предотвращение возврата функции сервера gRPC должно поддерживать поток, пока клиент не закрывает поток. Если эта функция вернется, сервер закроет поток.   -  person William King    schedule 28.04.2016


Ответы (1)


Для тех из вас, кто может столкнуться с той же проблемой, вот быстрое решение. По сути, вам нужно обернуть свой поток в структуру, чтобы возникла ошибка chan. Возврат будет блокироваться до тех пор, пока не произойдет ошибка при выполнении stream.Send ()

type Connection struct {
    stream LighterGRPC.Lighter_CheckConnectionServer
    error  chan error
}

Ваша карта будет такой:

type Server struct {
    ....
    conns map[string]Connection
}

И ваш RPC для создания потока в конце должен быть таким:

conn := Connection{
    stream: stream,
    error:  make(chan error),
}
s.conns[initMessage.DeviceID] = conn

return <-conn.error

Таким образом, ваш поток будет сохранен на карте, будучи «активным».

person ysakiyev    schedule 08.06.2020