Ошибка при подключении к серверу Spring Boot RSocket из клиента RSocket-Java

У меня возникла проблема при подключении к приложению Spring Boot RSocket через TCP. Клиент при использовании RSocketRequester работает нормально, но когда я пытаюсь подключиться с помощью клиента RSocketFactory, он продолжает получать ошибки. Код ниже.

        RSocket rSocket = this.client = RSocketFactory
            .connect()
            .mimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.toString(), MediaType.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create("localhost", 7000))
            .start()
            .block();


Flux<Payload> s = rSocket.requestStream(DefaultPayload.create("1234", "socket"));
    s.subscribe();

Это дает ошибку, как показано ниже:

java.lang.IndexOutOfBoundsException: readerIndex(1) + length(115) exceeds writerIndex(6): AbstractPooledDerivedByteBuf$PooledNonRetainedSlicedByteBuf(ridx: 1, widx: 6, cap: 6/6, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 1024))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1477)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1463)
at io.netty.buffer.AbstractByteBuf.readSlice(AbstractByteBuf.java:880)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:47)
at io.rsocket.metadata.TaggingMetadata$1.next(TaggingMetadata.java:37)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extractEntry(DefaultMetadataExtractor.java:136)
at org.springframework.messaging.rsocket.DefaultMetadataExtractor.extract(DefaultMetadataExtractor.java:119)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.createHeaders(MessagingRSocket.java:195)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:167)
at org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestStream(MessagingRSocket.java:127)
at io.rsocket.RSocketResponder.requestStream(RSocketResponder.java:207)
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:310)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
at reactor.core.publisher.Flux.subscribe(Flux.java:8174)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317)
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)

Эта конкретная ошибка, насколько я понимаю, связана с переносом сообщений netty (из других потоков в stackoverflow), но как ее решить? Сервер - это Spring Boot 5+ RSocket, но клиент использует только RSocket-Java.


person user3549576    schedule 11.04.2020    source источник


Ответы (2)


Проблема в пантомиме. В вашем случае сервер ожидает CBOR, но вы продолжаете application/json

Решение кода: измените способ инициализации RSocketRequester, как в приведенном ниже примере, и ваш клиент отправляет CBOR, что вы можете увидеть, включив отладку: logging.level.io.rsocket.FrameLogger: DEBUG. На этом все, привет, мир, нет необходимости в пользовательских стратегиях или реализациях фабрик на стороне клиента.

@Bean
RSocketRequester rSocketRequester(RSocketStrategies strategies) {
    return RSocketRequester
            .builder()
            .rsocketStrategies(strategies)
            .connectTcp("127.0.0.1", 7000)
            .retry(5)
            .block();
    }

Кстати, я не нашел решения с JSON с обеих сторон, даже с пользовательскими Encoder и Decoder с обеих сторон. Думаю, причина в том, что нет CBOR to Jackson конвертера и только наоборот: org.springframework.http.codec.cbor.Jackson2CborEncoder

person Artem Ptushkin    schedule 13.04.2020
comment
Благодарю за ответ. Я не использую RSocketRequester на стороне клиента. Проблема в том, что я помещаю метаданные (для маршрутизации здесь сокет - это маршрут @MessageMapping на сервере). Без метаданных я нормально получаю полезную нагрузку на сервере. Я пытаюсь выяснить, как Spring-Rsocket выполняет сопоставление маршрута и отправляет его в метаданных. - person user3549576; 13.04.2020
comment
Как я понял: клиент отправляет метаданные route: some, а сервер rsocket разрешает это с помощью PathPatternRouteMatcher - person Artem Ptushkin; 14.04.2020
comment
Я думаю, это то, что я пытаюсь понять. Что поместить (с умом кодировать) в маршрут, чтобы PathPatternRouteMater по умолчанию мог направить на правильную конечную точку (сервер использует @MessageMapping из Spring). Если это слишком сложно, как поставить собственный RouteMatcher, который дружелюбен к Spring и работает с rSocket. Я обновил свой код, чтобы сделать его понятным. - person user3549576; 24.04.2020

Из это используйте следующее для создания метаданных.

CompositeByteBuf metadata = ByteBufAllocator.DEFAULT.compositeBuffer();
RoutingMetadata routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, List.of("/route"));
CompositeMetadataCodec.encodeAndAddMetadata(metadata,
        ByteBufAllocator.DEFAULT,
        WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
        routingMetadata.getContent());
person user3549576    schedule 10.07.2020