Двунаправленный поток JAVA GRPC выдает CANCELED: io.grpc.Context был отменен без ошибок

Я пытаюсь реализовать службу двунаправленной потоковой передачи для всех узлов кластера. Используя это, эти узлы должны синхронизировать свои состояния, т.е. всякий раз, когда какой-либо сервер получит какое-либо обновление с помощью serveRequest запроса от любого клиента, он будет передавать эту информацию другим узлам в кластере с помощью потоковой службы syncState.

Для этого на каждом узле кластера настроены клиенты для подключения ко всем остальным узлам. Инициализация клиентов является однократной, и создается поток (1 клиент имеет 1 поток на узел), с помощью которого предполагается синхронизировать последнее состояние с другими узлами.

Образец прототипа

message MessageRequest {
  string policyId = 1;
  string txnId = 2;
  string clientId = 3;
}

message Acknowledgement {
  string serverId = 1;
  string txnId = 2;
}

service SyncState{
  rpc sync(stream MessageRequest) returns (stream Acknowledgement) {}
}

service ServeRequest{
  rpc newRequest(MessageRequest) returns (Acknowledgement) {}
}

SyncService

public class SyncStateService extends SyncStateServiceGrpc.SyncStateServiceImplBase {

    private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class);
    private SyncManager manager;
    private Configuration config;

    public SyncStateService(SyncManager manager, Configuration config) {
        this.manager = manager;
        this.config = config;
    }

    @Override
    public StreamObserver<MessageRequest> sync(StreamObserver<Acknowledgement> responseObserver) {
        logger.info("Server received connection request");
        return new StreamObserver<MessageRequest>() {
            @Override
            public void onNext(MessageRequest request) {
                Acknowledgement ack = Acknowledgement.newBuilder().setTxnId(request.getTxnId()).setServerId(config.getClientId()).build();
                MessageRequest clone = MessageRequest.newBuilder().setClientId(config.getClientId()).setTxnId(request.getTxnId()).build();
                CompletableFuture.runAsync(() -> manager.sendMessage(clone));
                responseObserver.onNext(ack);
            }

            @Override
            public void onError(Throwable t) {
                logger.error("Error on server for client stream", t);
            }

            @Override
            public void onCompleted() {
                logger.info("client called on completed on stream, server closes response stream");
                responseObserver.onCompleted();
            }
        };
    }
}

Client Stream Manager

public class SyncManager {
    private Map<String, SyncClient> client = new HashMap<>();
    private final Configuration  config;

    public SyncManager(Configuration  config) {
        this.config = config;
    }

    public void startStateSync(MessageRequest clone) {
        //This method is called whenever
        //ServeRequest APi is hit on any server
        config.getNodes().forEach(e->{
            SyncClient syncClient = client.get(e);
            if (syncClient== null){
                syncClient = new SyncClient(e);
            }
            MessageRequest gossipRequest = MessageRequest.newBuilder().setTxnId(clone.getTxnId())
                    .setPolicyId(clone.getPolicyId()).setClientId(config.getClientId()).build();
            syncClient.syncMessage(e, gossipRequest);
        });
    }

    public void sendMessage(MessageRequest clone) {
        // Server which has received state sync request
        //forwards the same info to other servers
        config.getNodes().forEach(e->{
            SyncClient syncClient = client.get(e);
            if (syncClient== null){
                syncClient = new SyncClient(e);
            }
            MessageRequest gossipRequest = MessageRequest.newBuilder().setTxnId(clone.getTxnId())
                    .setPolicyId(clone.getPolicyId()).setClientId(config.getClientId()).build();
            syncClient.syncMessage(e, gossipRequest);
        });
    }
}

Клиент:

public class SyncClient {
    private static final Logger logger = LoggerFactory.getLogger(SyncClient.class);
    private final ManagedChannel channel;
    private final SyncStateServiceGrpc.SyncStateServiceStub stub;
    private final StreamObserver<MessageRequest> stream;

    public SyncClient(String node) {
        channel = ManagedChannelBuilder.forTarget(node)
                .usePlaintext()
                .keepAliveTime(new Long(2), TimeUnit.DAYS)
                .keepAliveTimeout(new Long(1), TimeUnit.DAYS)
                .keepAliveWithoutCalls(true)
                .build();
        stub = SyncStateServiceGrpc.newStub(channel);
        stream = getStreamObserver();
    }

    private StreamObserver<MessageRequest> getStreamObserver() {
        final StreamObserver<MessageRequest> stream;
        stream = stub.sync(new StreamObserver<>() {
            @Override
            public void onNext(Acknowledgement value) {
                String txnId = value.getTxnId();
                logger.info("Received message from server");
                String serverId = value.getServerId();

            }

            @Override
            public void onError(Throwable t) {
                logger.error("Error occurred", t);
            }

            @Override
            public void onCompleted() {
                logger.info("Completed");
            }
        });
        return stream;
    }
    
    public void syncMessage(String node, MessageRequest request) {
        logger.info("Sending message to:: {}", node);
        stream.onNext(request);
    }
}

Проблемы:

  1. При установке клиента метод синхронизации сервера никогда не запускается (на сервере нет журналов, необходимо распечатать этот запрос на соединение, полученный сервером)
  2. stream.onNext (request) на клиентах всегда выдает ошибку:

io.grpc.StatusRuntimeException: CANCELED: io.grpc.Context был отменен без ошибок в io.grpc.Status.asRuntimeException (Status.java:535) ~ [grpc-api-1.36.0.jar: 1.36.0] в io .grpc.stub.ClientCalls $ StreamObserverToCallListenerAdapter.onClose (ClientCalls.java:478) ~ [grpc-stub-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener $ 3.Clrun (Delayedava : 464) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener.delayOrExecute (DelayedClientCall.java:428) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener.onClose (DelayedClientCall.java:461) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.ClientCallImpl.closeObserver ( ClientCallImpl.java:553) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.ClientCallImpl.access $ 300 (ClientCallImpl.java:68) ~ [grpc-core-1.36.0. jar: 1.36.0] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInternal (ClientCallImpl.java:739) ~ [grpc-core-1.36.0. jar: 1.36.0] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInContext (ClientCallImpl.java:718) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal. ContextRunnable.run (ContextRunnable.java:37) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.SerializingExecutor.run (SerializingExecutor.java:123) ~ [grpc-core-1.36 .0.jar: 1.36.0] в java.base / java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1130) ~ [na: na] в java.base / java.util.concurrent.ThreadPoolExecutor $ Worker .run (ThreadPoolExecutor.java:630) ~ [na: na] в java.base / java.lang.Thread.run (Thread.java:832) ~ [na: na]

Что мне не хватает в двунаправленной потоковой передаче с помощью GRPC?


person Nitish Bhardwaj    schedule 29.04.2021    source источник


Ответы (1)


Оказывается, проблема в зависимой библиотеке grpc-spring-boot-starter. Когда я переключился на собственный проект Gradle с grpc, тот же код работал отлично.

Чтобы узнать подробнее, отслеживайте проблему.

person Nitish Bhardwaj    schedule 30.04.2021