Документация Spring Cloud 3.1 о том, как использовать KTable в загрузочном приложении Spring

Я изо всех сил пытаюсь найти какую-либо документацию о том, где я могу использовать Spring Cloud Streams, которая берет тему Kafka и помещает ее в KTable. Посмотрев документацию, например, здесь https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_materializing_ktable_as_a_state_store нет ничего конкретного в том, как сделать это при загрузке Spring с использованием аннотаций. Я надеялся, что смогу просто создать простой KTable, используя KStream, где в моем application.properties у меня есть это: spring.cloud.stream.bindings.process-in-0.destination: my-topic

Затем в моей конфигурации я надеялся, что смогу сделать что-то вроде этого

@Bean
public Consumer<KStream<String, String>> process() {
    return input -> input.toTable(Materialized.as("my-store"))
}

Посоветуйте, пожалуйста, что мне не хватает?


person Matt    schedule 21.04.2021    source источник


Ответы (1)


Если все, что вы хотите сделать, это использовать данные из темы Kafka как KTable, вы можете сделать это, как показано ниже.

@Bean
public Consumer<KTable<String, String>> process() {
    return input -> {
        
    };
}

Если вы хотите материализовать таблицу в именованное хранилище, вы можете добавить это в конфигурацию.

spring.cloud.stream.kafka.streams.bindings.process_in_0.consumer.materializedAs: my-store

Вы также можете сделать то, что указали в вопросе, т.е. получить его как KStream, а затем преобразовать в KTable. Однако, если это все, что вам нужно сделать, вы можете скорее получить его как KTable, как я предлагаю здесь.

person sobychacko    schedule 21.04.2021
comment
как Spring знает, какую тему использовать в этом примере? Я хотел запросить эти данные, чтобы получить запись по ключу - person Matt; 22.04.2021
comment
Вы можете указать тему, как обычно, в любых других приложениях Spring Cloud Stream. В этом конкретном примере это будет spring.cloud.stream.bindings.process-in-0.destination=<your-input-topic>. - person sobychacko; 23.04.2021
comment
У меня также есть требование, когда мне нужно читать из этой ktable в контроллере стиля REST, как это тогда будет влиять на это? Мне известно об InteractiveQueryService, поэтому я подозреваю, что id нужно ввести, но как тогда эта служба запросов узнает об этом KTable? - person Matt; 23.04.2021