Как возобновить сеанс с RSocket при сбое сервера RSocket

Хорошо, я новичок в RSocket. Я пытаюсь создать простой клиент RSocket и простой сервер RSocket. Из проведенного мной исследования говорится, что RSocket поддерживает возобновление:

Это особенно полезно, поскольку при отправке кадра RESUME, содержащего информацию о последнем полученном кадре, клиент может возобновить соединение и запрашивать только те данные, которые он еще не получил, избегая ненужной нагрузки на сервер и тратя время на попытки получить данные, которые уже были получены.

В нем также говорится, что клиент является ответственным за разрешение возобновления. У меня вопрос, как включить это возобновление и как отправить этот кадр RESUME. У меня есть работающие клиент и сервер, но если я выключу сервер и снова запустил его, ничего не происходит, а позже, когда клиент снова пытается связаться с сервером, он выдает: java.nio.channels.ClosedChannelException.

Это моя конфигурация клиента:

@Configuration
public class ClientConfiguration {

/**
 * Defining the RSocket client to use tcp transport on port 7000
 */
@Bean
public RSocket rSocket() {
    return RSocketFactory
            .connect()
            .resumeSessionDuration(Duration.ofDays(10))
            .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create(7000))
            .start()
            .block();
}

/**
 * RSocketRequester bean which is a wrapper around RSocket
 * and it is used to communicate with the RSocket server
 */
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}

}

И это RestController, из которого я начинаю общение с сервером rsocket:

@RestController
public class UserDataRestController {

private final RSocketRequester rSocketRequester;

public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
    this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}

@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
    return rSocketRequester
            .route("feedPersonData")
            .data(new PersonDataRequest(firstName))
            .retrieveFlux(Person.class);
}

}

person f.trajkovski    schedule 28.01.2020    source источник


Ответы (2)


Из-за того, что сеансы хранятся в памяти, вы не можете возобновить их после перезапуска сервера. См. io.rsocket.resume.SessionManager#sessions.

Но вы все равно можете защитить себя от проблем с сетью, если снова подключитесь к тому же серверу. И вам не нужно отправлять фрейм RESUME, клиент сделает это за вас.

Вам необходимо настроить сервер:

@Bean
ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
    return RSocketFactory.ServerRSocketFactory::resume;
}

И клиент io.rsocket.RSocketFactory.ClientRSocketFactory#resume.

Вы можете найти почти полный пример здесь

person Alexander Pankin    schedule 29.01.2020
comment
Это работает! Большое спасибо. Клиент смог связаться с сервером даже после его перезапуска. - person f.trajkovski; 29.01.2020

Код, предоставленный @Alexander Pankin, устарел. Я использовал этот код для настройки возобновления одного сервера:

    @Bean
    RSocketServerCustomizer rSocketResume() {
        Resume resume =
                new Resume()
                        .sessionDuration(Duration.ofMinutes(15))
                        .retry(
                                Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                                        .doBeforeRetry(s -> log.debug("Disconnected. Trying to resume...")));
        return rSocketServer -> rSocketServer.resume(resume);
    }
person kojot    schedule 06.07.2020
comment
Я думаю, что повторная попытка применима только к клиентам. - person Yuri Schimke; 27.02.2021