У меня есть приложение Spring Boot (app0), которое использует Spring Cloud Stream Kafka для чтения из темы.
Есть два других приложения (app1, app2), которые создают сообщения по этой теме.
app1 публикует сообщения с помощью интерфейса OrderSource:
public interface OrderSource{
String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;
@Output(OrderSource.OUTPUT_PAYMENT)
MessageChannel output();
Например:
orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);
В этом случае app0 без проблем читает сообщения от app1.
app2 публикует свои сообщения с помощью KafkaTemplate:
ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);
В этом случае я наблюдаю следующее исключение из EmbeddedHeadersMessageConverter
:
java.lang.StringIndexOutOfBoundsException: String index out of range: 152
at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
По-видимому, он пытается извлечь заголовки из полезной нагрузки сообщения. Как я могу предотвратить возникновение этого исключения, поддерживая оба источника сообщений (KafkaTemplate и OrderSource).