Spring Cloud Stream Kafka - EmbeddedHeadersMessageConverter - java.lang.StringIndexOutOfBoundsException: индекс строки вне допустимого диапазона в

У меня есть приложение Spring Boot (app0), которое использует Spring Cloud Stream Kafka для чтения из темы.

Есть два других приложения (app1, app2), которые создают сообщения по этой теме.

app1 публикует сообщения с помощью интерфейса OrderSource:

public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;

    @Output(OrderSource.OUTPUT_PAYMENT)
    MessageChannel output();

Например:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);

В этом случае app0 без проблем читает сообщения от app1.

app2 публикует свои сообщения с помощью KafkaTemplate:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);

В этом случае я наблюдаю следующее исключение из EmbeddedHeadersMessageConverter:

java.lang.StringIndexOutOfBoundsException: String index out of range: 152
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]

По-видимому, он пытается извлечь заголовки из полезной нагрузки сообщения. Как я могу предотвратить возникновение этого исключения, поддерживая оба источника сообщений (KafkaTemplate и OrderSource).


person codependent    schedule 11.02.2017    source источник


Ответы (1)


Чтобы взаимодействовать с приложениями, отличными от Spring-Cloud-Stream, вам необходимо настроить headerMode на потребителе на raw.

Вам также нужно будет сделать то же самое с производителем для app1, чтобы он не встраивал заголовки.

См. свойства потребителей и свойства производителя.

person Gary Russell    schedule 11.02.2017
comment
Спасибо, Гэри. Пара вопросов: каковы последствия отказа от встраивания заголовков? Будут ли они отправлены? - person codependent; 11.02.2017
comment
У Кафки нет концепции заголовков; вот почему мы должны встроить их в полезную нагрузку. Единственный передаваемый заголовок (по умолчанию) - это тип содержимого; это необходимо только в том случае, если вы полагаетесь на связыватель для выполнения любого сложного преобразования сообщений в приложении-потребителе. Если вам это нужно или вы настроили связыватель отправки (app1) для встраивания любых пользовательских заголовков, вам не повезло. Но временным решением будет использование встроенного конвертера в вашем приложении2 перед отправкой. Или используйте разные темы для app1 / app2 и соответствующим образом настройте app0. - person Gary Russell; 11.02.2017
comment
Теперь все ясно, спасибо !! Не могли бы вы указать на любую ссылку для этого: a work-around there would be to use the embedded converter in your app2 before sending - person codependent; 11.02.2017
comment
Вы хотели использовать EmbeddedHeadersMessageConverter.embedHeaders() в app2 и отправить полученный массив байтов? - person codependent; 11.02.2017
comment
Совершенно верно; это то, что я имел в виду. - person Gary Russell; 11.02.2017