Сообщения Spring Cloud Stream Kafka Producer

Я хочу настроить производитель spring-cloud-stream-kafka с весенней загрузкой.

Производитель работает, и я могу получать сообщения от брокера kafka, но сообщения также содержат некоторую информацию заголовка, например следующую:

contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"message":"hello"}

Мой POJO содержит одно поле (сообщение String), поэтому я ожидаю, что в kafka будет отправлена ​​только строка JSON.

Метод test () в моем RestController запускает производителя:

@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {

private MessageChannel consumer;

public KafkaStreamProducerApplication(ProducerChannels channels) {
    this.consumer = channels.consumer();
}

@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
    Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
    this.consumer.send(msg);
}

interface ProducerChannels {

    @Output
    MessageChannel consumer();
}

Мои application.properties

spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json

Я также был бы признателен, если бы вы порекомендовали какие-либо документы или примеры по этой теме. Примеры на github обычно очень тонкие, они используют много автоконфигурация и без пояснений. Я использовал пример для RabbitMQ.


person Peter Lustig    schedule 22.08.2017    source источник


Ответы (2)


Если вы хотите избежать встраивания заголовков (чтобы вы могли получать сообщения в некоторых приложениях, отличных от Spring Cloud Stream), установите headerMode производителя на raw.

См. Свойства производителя.

headerMode

Если установлено значение raw, отключает встраивание заголовка при выводе. Действует только для промежуточного программного обеспечения обмена сообщениями, которое изначально не поддерживает заголовки сообщений и требует встраивания заголовков. Полезно при создании данных для приложений, отличных от Spring Cloud Stream.

По умолчанию: embeddedHeaders.

person Gary Russell    schedule 22.08.2017
comment
Был почти уверен, что попробовал это безуспешно. Возможно, я не перезапустил сервер, потому что забыл включить devtools :(. Спасибо за правильный ответ и ссылку на документы. - person Peter Lustig; 22.08.2017

Заголовки contentType и originalContentType используются Spring Cloud Stream при десериализации сообщения клиентским приложением и выполняют преобразование сообщения на основе набора типов содержимого.

Заголовок contentType явно устанавливается только тогда, когда вы настраиваете тип содержимого привязок, как вы это делали здесь spring.cloud.stream.bindings.consumer.content-type=application/json. Когда установлен заголовок contentType, Spring Cloud Stream сохраняет этот заголовок с помощью флага originalContentType во время процесса сериализации / десериализации для создания / получения сообщений от брокера (через связыватель).

В вашем случае, я думаю, вам может вообще не понадобиться устанавливать contentType.

Для примеров, помимо образцов в репозитории github spring-cloud-stream-samples, вы также можете обратиться к готовые стартеры приложений, которые охватывают широкий спектр приложений, которые могут работать с любыми поддерживаемыми связывателями (включая Kafka).

person Ilayaperumal Gopinathan    schedule 22.08.2017