org.apache.kafka.common.errors.RecordTooLargeException

Я выполняю агрегирование потоков kafka и записываю агрегированные записи в тему и получаю следующие ошибки. Я использую пользовательский json serde для вспомогательного класса агрегации. В некоторых блогах я нашел решение этой проблемы - увеличить max.request.size.

Хотя я увеличиваю максимальный размер запроса со значения по умолчанию до 401391899, размер сериализованного сообщения агрегирования продолжает увеличиваться при последующих записях в тему.

При запуске потоков через 10 минут появляется ошибка, указанная ниже. Не уверен, что проблема связана с моим serde, или мне следует изменить какие-либо настройки конфигурации, кроме max.request.size, чтобы решить эту проблему.

Сообщение написано в тему;

{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }   
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }  
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }

org.apache.kafka.common.errors.RecordTooLargeException: сообщение имеет размер 2292506 байт при сериализации, что превышает максимальный размер запроса, который вы настроили с конфигурацией max.request.size.

Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
    at java.lang.AbstractStringBuilder.append(Unknown Source)
    at java.lang.StringBuilder.append(Unknown Source)
    at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
    at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
    at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
    at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
    at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
    at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
    at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
    at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
    at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
    at com.google.gson.Gson.fromJson(Gson.java:795)
    at com.google.gson.Gson.fromJson(Gson.java:761)
    at com.google.gson.Gson.fromJson(Gson.java:710)
    at com.google.gson.Gson.fromJson(Gson.java:682)
    at com.data.agg.streams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
    at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
    at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
    at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
    at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
    at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
    at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process

person Barro    schedule 28.04.2017    source источник
comment
Было бы полезно, если бы вы поделились своим кодом, а не только сообщением об ошибке.   -  person Michael G. Noll    schedule 28.04.2017


Ответы (1)


Это выстрел в темноту, поскольку вы не делились своим кодом, но я предполагаю, что вы собираете записи в своем оконном агрегировании в более крупную и большую запись, которую вы поддерживаете как результат агрегирования.

Поскольку состояние поддерживается темой Kafka для обеспечения отказоустойчивости, Streams записывает запись в эту тему (одна запись для каждого ключа, причем значение является состоянием, принадлежащим ключу). По мере того, как ваше состояние (для каждого ключа) увеличивается с течением времени, «записи состояния» растут с течением времени и в конечном итоге превышают ограничение максимального размера.

person Matthias J. Sax    schedule 07.05.2017
comment
У меня есть случай использования, когда мне нужно сделать отдельный подсчет строк агента пользователя по ip. Иногда, когда некоторые IP-адреса получают более 50000 уникальных агентов-пользователей, бывает сложно разместить эти записи в памяти и выполнить агрегацию ktable. Поскольку эти записи со временем растут и вызывают следующую ошибку: org.apache.kafka.common.errors.RecordTooLargeException Сообщение составляет 947859917 байт при сериализации, что превышает максимальный размер запроса, который вы настроили с конфигурацией max.request.size. - person Barro; 09.05.2017
comment
Для уникальных подсчетов исходные значения необходимо сохранить в хэш-наборе для каждой записи для уникального подсчета. Как обрабатывать такие записи, которые превышают ограничение максимального размера. Я считаю, что это причина, по которой я получаю исключение слишком большого размера. - person Barro; 09.05.2017
comment
Вы можете только изменить конфигурацию (см. документы. confluent.io/current/streams/) или измените код, чтобы не создавать записи с увеличивающимся размером. - person Matthias J. Sax; 10.05.2017
comment
Я согласен с вашей точкой зрения, что я должен пойти с изменением конфигурации. Однако я не был уверен, приведет ли увеличение размера сообщения к снижению пропускной способности. В качестве альтернативы я сделал хеширование уникальной строки агента-пользователя для отдельного подсчета, что уменьшило размер записи и решило мою проблему. - person Barro; 10.05.2017