RSocket не работает при защите с помощью TLS - сервер java.lang.UnsupportedOperationException - клиент java.nio.channels.ClosedChannelException

ОБНОВЛЕНИЕ Я загрузил образец проекта на Github, где вы может воспроизвести проблему. Ознакомьтесь с инструкциями в ридми.

У меня есть сервер RSocket, который доступен для потоков запросов и генерирует поток с n случайными числами:

class RequestStreamRSocketServer

@ExperimentalUnsignedTypes
fun main() {

    val latch = CountDownLatch(1)

    RSocketFactory.receive()
        .frameDecoder(PayloadDecoder.DEFAULT)
        .acceptor { setup, sendingSocket ->
            Mono.just(
                object : AbstractRSocket() {
                    override fun requestStream(payload: Payload): Flux<Payload> {
                        val randomNumberGenerator = Random(1234)
                        val numbers = payload.dataUtf8.toInt()
                        println("Generating $numbers random numbers")
                        return IntRange(1, numbers)
                            .map { DefaultPayload.create(randomNumberGenerator.nextUInt().toString().toByteArray()) }
                            .toList().toFlux()
                    }
                })
        }
        .transport(
            TcpServerTransport.create(TcpServer.create().port(7878))
        )
        .start()
        .block()

    latch.await()
}

Я также создал клиента, который подключается к RSocket и запрашивает 10 случайных чисел:

class RequestStreamRSocketClient

@ExperimentalUnsignedTypes
fun main() {

    val latch = CountDownLatch(1)

    val path = RequestStreamRSocketClient::class.java.getResource("truststore.jks").path
    System.setProperty("javax.net.ssl.trustStore", path)
    System.setProperty("javax.net.ssl.trustStorePassword", "123456")


    val client = RSocketFactory.connect()
        .frameDecoder(PayloadDecoder.DEFAULT)
        .transport(TcpClientTransport.create(TcpClient.create().port(7878)))
        .start()
        .block()

    client.requestStream(DefaultPayload.create("10"))
        .map { it.dataUtf8 }
        //.onErrorReturn("error")
        .doOnNext(System.out::println)
        .doOnComplete { latch.countDown() }
        .doOnError { it.printStackTrace() }
        .subscribe()

    latch.await()
}

Это работает отлично.

Журналы сервера:

Generating 10 random numbers

Журналы клиентов:

345130239
2958210271
3979283303
4254072378
4206518657
1432197826
3787126071
2479634382
4147073748
3864383859

Process finished with exit code 0

Однако я хотел бы использовать TLS для связи через RSocket, поэтому я создал файл certificate.pem / key.pem для сервера и настроил его:

.transport(
    TcpServerTransport.create(TcpServer.create().port(7878).secure {
        it.sslContext(
            SslContextBuilder.forServer(
                File(RequestStreamRSocketServer::class.java.getResource("certificate.pem").toURI()),
                File(RequestStreamRSocketServer::class.java.getResource("key.pem").toURI())
            )
        )
    })
)

На стороне клиента я создал truststore.jks, импортировал certificate.pem и настроил клиент для использования защищенной связи:

    val path = RequestStreamRSocketClient::class.java.getResource("truststore.jks").path
    System.setProperty("javax.net.ssl.trustStore", path)
    System.setProperty("javax.net.ssl.trustStorePassword", "123456")
    ...

        .transport(TcpClientTransport.create(TcpClient.create().port(7878).secure {
            it.sslContext(SslContextBuilder.forClient())
        }))

После запуска сервера запускаю клиента. Вызывается поток запроса приемника сервера (печать Generating 10 random numbers), но сразу же происходит сбой:

Generating 10 random numbers
java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@4f230679(failure: java.lang.UnsupportedOperationException)
    at io.netty.channel.AbstractChannelHandlerContext.isNotValidPromise(AbstractChannelHandlerContext.java:891)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:773)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:701)
    at io.netty.handler.ssl.SslHandler.finishWrap(SslHandler.java:899)
    at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:885)
    at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
    at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
    at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:621)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

На стороне клиента есть исключение закрытого канала:

java.nio.channels.ClosedChannelException
    at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:476)
    at io.rsocket.RSocketRequester.lambda$new$0(RSocketRequester.java:94)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:139)
    at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
    at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
    at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
    at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
    at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:61)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
    at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:527)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:98)
    at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
    at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1156)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:758)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:734)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:605)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1363)
    at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:621)
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:605)
    at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:467)
    at io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:1092)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1388)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
    at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
    at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

Как исправить для работы с TLS?


person codependent    schedule 19.11.2019    source источник


Ответы (2)


Попробуйте отправить клиентский сертификат, как показано ниже:

.transport(() -> {
                            TcpClient tcpClient = TcpClient.create().host("localhost").port(7878);
                            return TcpClientTransport.create(keyCertChainFile != null ?
                                    tcpClient.secure(sslContextSpec -> sslContextSpec
                                        .sslContext(SslContextBuilder.forClient().keyManager("client-cert.pem", "client-key.pem")
                                        .trustManager(new File("cacert.pem")))) :
                                    tcpClient);
                        }
person Karthik Palanivelu    schedule 20.11.2019
comment
Я ищу способ решить эту проблему только с помощью аутентификации сервера. Это будет услуга, доступная нескольким различным клиентам, которым не нужно будет предъявлять собственные сертификаты. - person codependent; 21.11.2019
comment
Я испытываю это исключение, когда на сервере есть нулевой указатель или любой другой недопустимый аргумент. Я бы сказал, что это может быть преобразованное исключение на клиенте. Не могли бы вы проверить и опубликовать, есть ли на сервере какие-либо исключения? Попробуйте использовать это на стороне сервера: .clientAuth(ClientAuth.REQUIRE) - person Karthik Palanivelu; 21.11.2019
comment
Привет, Картик, я ценю твои предложения. Других исключений я не видел. Проверьте обновленный вопрос, я загрузил образец проекта в Github, где вы можете увидеть проблему. - person codependent; 22.11.2019

Исправлено после обновления до RC7 и незначительного рефакторинга:

Сервер:

    val latch = CountDownLatch(1)

    RSocketServer.create()
        .payloadDecoder(PayloadDecoder.DEFAULT)
        .acceptor { setup, sendingSocket ->
            Mono.just(
                object : AbstractRSocket() {
                    override fun requestStream(payload: Payload): Flux<Payload> {
                        val randomNumberGenerator = Random(1234)
                        val numbers = payload.dataUtf8.toInt()
                        println("Generating $numbers random numbers")
                        return IntRange(1, numbers)
                            .map { DefaultPayload.create(randomNumberGenerator.nextUInt().toString().toByteArray()) }
                            .toList().toFlux()
                    }
                })
        }.bind(
            TcpServerTransport.create(TcpServer.create().port(7878)
                .secure {
                    it.sslContext(
                        SslContextBuilder.forServer(
                            File(RequestStreamRSocketServer::class.java.getResource("certificate.pem").toURI()),
                            File(RequestStreamRSocketServer::class.java.getResource("key.pem").toURI())
                        )
                    )
                })
        )
        .block()

    latch.await()

Клиент:

    val latch = CountDownLatch(1)

    val client = RSocketConnector.connectWith(
        TcpClientTransport.create(TcpClient.create().port(7878)
            .secure {
                it.sslContext(
                    SslContextBuilder.forClient().trustManager(
                        File(
                            RequestStreamRSocketClient::class.java.getResource(
                                "certificate.pem"
                            ).path
                        )
                    )
                )
            })
    ).block()!!

    client.requestStream(DefaultPayload.create("10"))
        .map { it.dataUtf8 }
        .doOnNext(System.out::println)
        .doOnComplete { latch.countDown() }
        .doOnError { it.printStackTrace() }
        .subscribe()

    latch.await()
person codependent    schedule 11.05.2020