Не удалось декодировать тип json в весеннем облачном потоке DefaultKafkaHeaderMapper

Мы используем spring-cloud-stream и планируем обновить нашу версию Kafka.
Наши приложения используют spring-cloud-stream:2.0.0 (spring-kafka 2.1.7) с сервером apache kafka 1.0.1, а также используют spring-cloud-sleuth:2.0.0 для отслеживания.
Мы собираемся обновите наш сервер Kafka до версии 2.3.0, поэтому он требует обновления до spring-boot 2.2.x (Hoxton) с spring-cloud-sleuth:2.2.0 и spring-cloud-stream:3.0.3 (Horsham.SR3).
У нас есть ~ 200 приложений, использующих Kafka, поэтому обновление будет происходить постепенно, поэтому в качестве промежуточного состояния у нас будут производители в новой версии, а потребители используют старую версию.
Наши потребители используют @StreamListener.

Во время наших тестов мы столкнулись с проблемой синтаксического анализа большинства заголовков с типом String и получения следующего:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

В то время как заголовок типов:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

Например, X-B3-SpanId, который был добавлен Sleuth, имеет тип String и значение: ecb89ccb3e79418b, которое не является строкой JSON, поэтому ObjectMapper не работает при преобразовании в String Object здесь:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

Похоже, он не должен использовать ObjectMapper, когда у нас есть типы String, поэтому наши старые потребители терпят неудачу.

Есть ли способ предотвратить эту проблему при использовании нового производителя и старого потребителя?




Ответы (1)


Вы можете настроить DefaultKafkaHeaderMapper для совместимости со старыми версиями:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

См. Также https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html.#_kafka_binder_properties

spring.cloud.stream.kafka.binder.headerMapperBeanName

person Gary Russell    schedule 14.12.2020
comment
Привет, @gary, это предложение не работает. Мы пытались производить с использованием Hoxton.SR3 с encodeStrings flag = true, установленным на DefaultKafkaHeaderMapper, как было предложено. При такой настройке потребитель может обрабатывать все заголовки String, но не работает с заголовком типа содержимого. Значение типа этого заголовка было изменено на org.springframework.util.MimeType (из-за изменения флага), а значение - application / json, поэтому он пытается преобразовать его в JSON и здесь не работает: headers.put(h.key(), getObjectMapper().readValue(h.value(), type)); с NPE. - person yuval simhon; 20.12.2020
comment
Попробуйте использовать BinderHeaderMapper с набором свойств вместо DefaultKafkaHeaderMapper; Изначально это был клон, но я вижу там дополнительный код для работы с MimeType. - person Gary Russell; 21.12.2020
comment
Спасибо @gary - BinderHeaderMapper сделал это, никаких других проблем, связанных с заголовками, начиная с spring-kafka:2.3.7 - person yuval simhon; 04.01.2021