Я пытаюсь реализовать службу двунаправленной потоковой передачи для всех узлов кластера. Используя это, эти узлы должны синхронизировать свои состояния, т.е. всякий раз, когда какой-либо сервер получит какое-либо обновление с помощью serveRequest
запроса от любого клиента, он будет передавать эту информацию другим узлам в кластере с помощью потоковой службы syncState
.
Для этого на каждом узле кластера настроены клиенты для подключения ко всем остальным узлам. Инициализация клиентов является однократной, и создается поток (1 клиент имеет 1 поток на узел), с помощью которого предполагается синхронизировать последнее состояние с другими узлами.
Образец прототипа
message MessageRequest {
string policyId = 1;
string txnId = 2;
string clientId = 3;
}
message Acknowledgement {
string serverId = 1;
string txnId = 2;
}
service SyncState{
rpc sync(stream MessageRequest) returns (stream Acknowledgement) {}
}
service ServeRequest{
rpc newRequest(MessageRequest) returns (Acknowledgement) {}
}
SyncService
public class SyncStateService extends SyncStateServiceGrpc.SyncStateServiceImplBase {
private static final Logger logger = LoggerFactory.getLogger(SyncStateService.class);
private SyncManager manager;
private Configuration config;
public SyncStateService(SyncManager manager, Configuration config) {
this.manager = manager;
this.config = config;
}
@Override
public StreamObserver<MessageRequest> sync(StreamObserver<Acknowledgement> responseObserver) {
logger.info("Server received connection request");
return new StreamObserver<MessageRequest>() {
@Override
public void onNext(MessageRequest request) {
Acknowledgement ack = Acknowledgement.newBuilder().setTxnId(request.getTxnId()).setServerId(config.getClientId()).build();
MessageRequest clone = MessageRequest.newBuilder().setClientId(config.getClientId()).setTxnId(request.getTxnId()).build();
CompletableFuture.runAsync(() -> manager.sendMessage(clone));
responseObserver.onNext(ack);
}
@Override
public void onError(Throwable t) {
logger.error("Error on server for client stream", t);
}
@Override
public void onCompleted() {
logger.info("client called on completed on stream, server closes response stream");
responseObserver.onCompleted();
}
};
}
}
Client Stream Manager
public class SyncManager {
private Map<String, SyncClient> client = new HashMap<>();
private final Configuration config;
public SyncManager(Configuration config) {
this.config = config;
}
public void startStateSync(MessageRequest clone) {
//This method is called whenever
//ServeRequest APi is hit on any server
config.getNodes().forEach(e->{
SyncClient syncClient = client.get(e);
if (syncClient== null){
syncClient = new SyncClient(e);
}
MessageRequest gossipRequest = MessageRequest.newBuilder().setTxnId(clone.getTxnId())
.setPolicyId(clone.getPolicyId()).setClientId(config.getClientId()).build();
syncClient.syncMessage(e, gossipRequest);
});
}
public void sendMessage(MessageRequest clone) {
// Server which has received state sync request
//forwards the same info to other servers
config.getNodes().forEach(e->{
SyncClient syncClient = client.get(e);
if (syncClient== null){
syncClient = new SyncClient(e);
}
MessageRequest gossipRequest = MessageRequest.newBuilder().setTxnId(clone.getTxnId())
.setPolicyId(clone.getPolicyId()).setClientId(config.getClientId()).build();
syncClient.syncMessage(e, gossipRequest);
});
}
}
Клиент:
public class SyncClient {
private static final Logger logger = LoggerFactory.getLogger(SyncClient.class);
private final ManagedChannel channel;
private final SyncStateServiceGrpc.SyncStateServiceStub stub;
private final StreamObserver<MessageRequest> stream;
public SyncClient(String node) {
channel = ManagedChannelBuilder.forTarget(node)
.usePlaintext()
.keepAliveTime(new Long(2), TimeUnit.DAYS)
.keepAliveTimeout(new Long(1), TimeUnit.DAYS)
.keepAliveWithoutCalls(true)
.build();
stub = SyncStateServiceGrpc.newStub(channel);
stream = getStreamObserver();
}
private StreamObserver<MessageRequest> getStreamObserver() {
final StreamObserver<MessageRequest> stream;
stream = stub.sync(new StreamObserver<>() {
@Override
public void onNext(Acknowledgement value) {
String txnId = value.getTxnId();
logger.info("Received message from server");
String serverId = value.getServerId();
}
@Override
public void onError(Throwable t) {
logger.error("Error occurred", t);
}
@Override
public void onCompleted() {
logger.info("Completed");
}
});
return stream;
}
public void syncMessage(String node, MessageRequest request) {
logger.info("Sending message to:: {}", node);
stream.onNext(request);
}
}
Проблемы:
- При установке клиента метод синхронизации сервера никогда не запускается (на сервере нет журналов, необходимо распечатать этот запрос на соединение, полученный сервером)
- stream.onNext (request) на клиентах всегда выдает ошибку:
io.grpc.StatusRuntimeException: CANCELED: io.grpc.Context был отменен без ошибок в io.grpc.Status.asRuntimeException (Status.java:535) ~ [grpc-api-1.36.0.jar: 1.36.0] в io .grpc.stub.ClientCalls $ StreamObserverToCallListenerAdapter.onClose (ClientCalls.java:478) ~ [grpc-stub-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener $ 3.Clrun (Delayedava : 464) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener.delayOrExecute (DelayedClientCall.java:428) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.DelayedClientCall $ DelayedListener.onClose (DelayedClientCall.java:461) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.ClientCallImpl.closeObserver ( ClientCallImpl.java:553) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.ClientCallImpl.access $ 300 (ClientCallImpl.java:68) ~ [grpc-core-1.36.0. jar: 1.36.0] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInternal (ClientCallImpl.java:739) ~ [grpc-core-1.36.0. jar: 1.36.0] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInContext (ClientCallImpl.java:718) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal. ContextRunnable.run (ContextRunnable.java:37) ~ [grpc-core-1.36.0.jar: 1.36.0] в io.grpc.internal.SerializingExecutor.run (SerializingExecutor.java:123) ~ [grpc-core-1.36 .0.jar: 1.36.0] в java.base / java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1130) ~ [na: na] в java.base / java.util.concurrent.ThreadPoolExecutor $ Worker .run (ThreadPoolExecutor.java:630) ~ [na: na] в java.base / java.lang.Thread.run (Thread.java:832) ~ [na: na]
Что мне не хватает в двунаправленной потоковой передаче с помощью GRPC?