Я выполняю агрегирование потоков kafka и записываю агрегированные записи в тему и получаю следующие ошибки. Я использую пользовательский json serde для вспомогательного класса агрегации. В некоторых блогах я нашел решение этой проблемы - увеличить max.request.size.
Хотя я увеличиваю максимальный размер запроса со значения по умолчанию до 401391899, размер сериализованного сообщения агрегирования продолжает увеличиваться при последующих записях в тему.
При запуске потоков через 10 минут появляется ошибка, указанная ниже. Не уверен, что проблема связана с моим serde, или мне следует изменить какие-либо настройки конфигурации, кроме max.request.size, чтобы решить эту проблему.
Сообщение написано в тему;
{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }
org.apache.kafka.common.errors.RecordTooLargeException: сообщение имеет размер 2292506 байт при сериализации, что превышает максимальный размер запроса, который вы настроили с конфигурацией max.request.size.
Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process