Spring WebFlux с ReactiveMongoRepository: не получают обновления базы данных через поток

Проблема, с которой я столкнулся:

Что бы я ни пробовал с помощью различных руководств по использованию REST API Spring Reactive (WebFlux), я не могу заставить его работать. Когда я сначала вызываю свою конечную точку, я могу получить результаты из коллекции MongoDB. Однако всякий раз, когда я делаю обновление записи документа или добавляю новый документ, он не обновляется через поток текстовых событий. Каждый раз мне приходится снова вызывать конечную точку, чтобы получить новые результаты.

Настройка:

В настоящее время у меня следующая установка:

  • Spring Cloud Gateway (за которым я запускаю разные)
  • Spring Boot Service, содержащий RestControllers (называя это основной службой)

Я использую Spring Webflux, Spring Cloud Gateway и Spring ReactiveMongoRepository.

Зависимости включали pom.xml для основной службы Spring Boot:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.security</groupId>
            <artifactId>spring-security-oauth2-jose</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>

Код для ReactiveMongoRepository:

@Repository
public interface TestRepository extends ReactiveMongoRepository<TestIntegration, String> {

    @Query(("{'userId': ?0}"))
    Flux<TestIntegration> findbyUserId(String userId);
}

Код для остального контроллера:

@GetMapping(value = "main/integrations", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<TestIntegration> retrieveIntegrations(ServerWebExchange exchange) {
        return testRepository.findAll();
}

Согласно каждому прочитанному мной руководству / руководству, это должно работать в соответствии с этим подходом. Кто-нибудь испытал это на себе или мог бы помочь в этом вопросе? В настоящее время застрял на этом несколько дней ...


person vv01    schedule 21.01.2021    source источник
comment
Это работает точно так, как задумано: вы открываете поток для своего приложения и запрашиваете записи в базе данных. Но как ваше приложение узнает о новых записях, записанных в базу данных? Ну это не так. Вам нужно, чтобы все приложения запускали событие при появлении новых записей и помещали их в поток.   -  person Toerktumlare    schedule 22.01.2021
comment
Это интересно и полезно знать, что он работает так, как задумано. Спасибо за разъяснения. Возможно, у вас есть информация о том, как могут быть реализованы эти триггерные события, возможно, какая-то документация и / или указатели? Чтобы я хоть знал, в какую сторону смотреть.   -  person vv01    schedule 22.01.2021
comment
Stack overflow - это не форум по программированию, это сайт вопросов и ответов, поэтому начните с поиска в Google того, что работает для вашей базы данных. Я понятия не имею, как это работает для вашей базы данных. Вам также не разрешается запрашивать книги, учебные пособия или документацию, это не такой сайт. прочтите правила переполнения стека. Если вы хотите «обсудить» проблемы, вам нужно найти другой сайт.   -  person Toerktumlare    schedule 22.01.2021
comment
Я не просил об этом специально, но неважно. Просто интересно, какой путь лучше всего искать, чтобы это сработало. Пока что я столкнулся с changeStreams для ReactiveMongoTemplate. Разберусь.   -  person vv01    schedule 22.01.2021
comment
Just wondering which way is the right way to search for in order to make this work запрашивает недопустимую документацию, учебные пособия и / или библиотеки. SO - это форум по программированию для очень конкретных вопросов с одним конкретным ответом. На ваш вопрос дан ответ. Для справки в будущем meta.stackoverflow.com/questions/261592/   -  person Toerktumlare    schedule 22.01.2021


Ответы (1)


Это работает как задумано. При потоковой передаче элементов из Flux<T> он будет подаваться до тех пор, пока в потоке есть элементы. Тогда поток закроется. Итак, в вашем случае он извлек все данные из базы данных, отправил их вам и затем закрылся.

Если вы хотите, чтобы поток оставался открытым, вам нужно продолжать отправлять данные. Один из способов сделать это - отправить :keep alive сообщения (сообщения, начинающиеся с запятой), используя ServerSentsEvents. Вы можете узнать больше о ServerSentEvents, colon operator и этих типах сообщений в официальная документация Mozilla.

Когда вы действительно можете держать поток открытым и хотите отправлять данные, ваша служба не будет знать, когда новые данные будут записаны в базу данных. Таким образом, вы либо опрашиваете свою базу данных, либо инициируете событие, когда что-то записывается, чтобы получить вновь записанные данные и поместить их в поток.

Как разместить данные в непрерывном открытом потоке - слишком большая тема для объяснения здесь. Но я предлагаю вам прочитать следующие разделы официальной документации по реактору:

Программное создание последовательности

Процессоры и приемники

person Toerktumlare    schedule 22.01.2021
comment
Это действительно полезно. Большое спасибо за то, что помогли указать в правильном направлении. Очень признателен. - person vv01; 23.01.2021