Как сервер gRPC может заметить, что клиент отменил вызов потоковой передачи на стороне сервера?

Я хочу использовать gRPC, чтобы клиенты могли подписываться на события, генерируемые сервером. У меня RPC объявлен так:

rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);

где возвращаемый поток бесконечен. Чтобы «отписаться», клиенты отменяют RPC (кстати, есть ли более чистый способ?).

Я понял, как клиент может отменить звонок:

Context.CancellableContext cancellableContext =
         Context.current().withCancellation();
cancellableContext.run(() -> {
   stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());

Однако сервер, похоже, не замечает, что клиент отменил свой вызов. Я тестирую это на фиктивной реализации сервера:

@Override
public void subscribe(SubscribeRequest request,
                      StreamObserver<SubscribeResponse> responseObserver) {
  // in real code, this will happen in a separate thread.
  while (!Thread.interrupted()) {
    responseObserver.onNext(SubscribeResponse.getDefaultInstance());
  }
}

Сервер с радостью продолжит отправку своих сообщений в эфир. Как сервер может распознать, что вызов был отменен клиентом, и, таким образом, прекратить отправку ответов?


person Balz Guenat    schedule 08.02.2019    source источник


Ответы (1)


Я сам нашел ответ. Вы передаете StreamObserver, чтобы подписаться на ServerCallStreamObserver, который предоставляет методы isCancelled и setOnCancelHandler.

scso = ((ServerCallStreamObserver<SubscribeResponse>) responseObserver);

scso.setOnCancelHandler(handler);
// or
if (scso.isCancelled()) {
  // do whatever
}

У меня возникает вопрос, почему subscribe не передается ServerCallStreamObserver с самого начала.

person Balz Guenat    schedule 08.02.2019
comment
Мы не можем изменить тип, потому что это сломало бы уже существующих пользователей :-(. Нам пришлось бы оставить старые методы, что вносит свои собственные затруднения. Обратите внимание, что Context.addListener() - еще один вариант. С ServerCallStreamObserver обратный вызов будет синхронизирован с другими обратными вызовами (например, onNext / onCompleted). С Context он будет вызываться отдельно, и вашей реализации придется беспокоиться о поточной безопасности. - person Eric Anderson; 08.02.2019
comment
@ eric-anderson, мне очень жаль, что всякий раз, когда я вижу аргумент «предыдущие пользователи», я должен прекратить все, что я делаю, и назвать его тупым. Да, может быть, 10 человек, использующих gRPC вне Google в 2016 году, заметят, если тип был переименован, но как насчет 10000 человек, использующих gRPC в 2020 году? Почему они до сих пор сбиты с толку? Тогда есть версионирование; API не обязательно должен иметь обратную совместимость, пока не наступит апокалипсис. - person Abhijit Sarkar; 04.08.2020
comment
@AbhijitSarkar несколько организаций со сложной сетью микросервисов, сталкивающиеся со значительным трафиком (например, Netflix), уже некоторое время используют gRPC. Я полагаю, они бы полностью распяли Эрика, если бы он внезапно так изменил API; -] Тем не менее, я думаю, что иметь 2 версии методов - гораздо лучшая идея, чем заставлять ppl выполнять какое-то причудливое приведение в начале каждого метода; -) - person morgwai; 05.06.2021
comment
@morgwai Я сказал: «Тогда есть версии»; это единственный аргумент, который должен выдвинуть разработчик программного обеспечения перед изменением API. - person Abhijit Sarkar; 05.06.2021
comment
@AbhijitSarkar, увеличивающий номер версии API, не меняет того факта, что такой организации, как Netflix, придется адаптировать сотни микросервисов для использования нового API, чтобы исправлять ошибки и улучшать новую версию. Чтобы избежать принуждения пользователей вашей библиотеки к такой адаптации, вам все равно потребуется поддерживать обе версии API в новом выпуске (аналогично тому, за что я выступал, кстати). Так что на самом деле управление версиями ничего не решает в случае модели скользящего выпуска. - person morgwai; 05.06.2021
comment
@morgwai Предыдущий основной выпуск будет переведен в режим обслуживания с исправлением только ошибок, сначала для одного или двух второстепенных выпусков, а затем больше не будет поддерживаться. Это не проблема и вовсе не проблема программного обеспечения; вопрос в том, насколько сопровождающий библиотеки хочет отступить, чтобы угодить крупным компаниям. - person Abhijit Sarkar; 05.06.2021