Как установить размер сообщений в Kafka?

Сейчас я использую Kafka 0.9.0.1. Согласно некоторым источникам, которые я нашел, способ установить размеры сообщений - это изменить следующие значения ключей в server.properties.

  • message.max.bytes
  • replica.fetch.max.bytes
  • fetch.message.max.bytes

В моем server.properties файле действительно есть эти настройки.

message.max.bytes=10485760
replica.fetch.max.bytes=20971520
fetch.message.max.bytes=10485760

Другие настройки, которые могут иметь значение, приведены ниже.

socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

Однако, когда я пытаюсь отправить сообщения с размером полезной нагрузки от 4 до 6 МБ, потребитель никогда не получает никаких сообщений. Производитель, кажется, отправляет сообщения без каких-либо исключений. Если я отправляю меньшие полезные данные (например, <1 МБ), то потребитель действительно получает сообщения.

Есть идеи, что я делаю неправильно с точки зрения настроек конфигурации?

Вот пример кода для отправки сообщения.

Producer<String, byte[]> producer = new KafkaProducer<>(getProducerProps());
File dir = new File("/path/to/dir");
for(String s : dir.list()) {
  File f = new File(dir, s);
  byte[] data = Files.readAllBytes(f.toPath());
  Payload payload = new Payload(data); //a simple pojo to store payload
  String key = String.valueOf(System.currentTimeMillis());
  byte[] val = KryoUtil.toBytes(payload); //custom util to use kryo to get bytes[]
  producer.send(new ProducerRecord<>("test", key, val));
}
producer.close();

Вот пример кода для получения сообщения.

KafkaConsumer consumer = new KafkaConsumer<>(getConsumerProps());
consumer.subscribe(Arrays.asList("test"));
while(true) {
  ConsumerRecord<String, byte[]> records = consumer.poll(100);
  for(ConsumerRecord<String, byte[]> record : records) {
    long offset = record.offset();
    String key = record.key();
    byte[] val = record.value();
    Payload payload = (Payload)KryoUtil.toObject(val, Payload.class); //custom util to use kryo to deserialize back to object
    System.out.println(
      System.format("offset=%d, key=%s", offset, key));
  }
}

Вот методы для заполнения файлов свойств для производителя и потребителя.

public static Properties getProducerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("compression.type", "snappy");
  props.put("max.request.size", 10485760); //need this
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
  return props;
}

public static Properties getConsumerProps() {
  Properties props = new Properties();
  props.put("bootstrap.servers", "qc1:9092,qc2:9092,qc3:9092,qc4:9092");
  props.put("group.id", "test");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("max.partition.fetch.bytes", 10485760); //need this too
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  return props;
}

person Jane Wayne    schedule 29.02.2016    source источник


Ответы (3)


Джейн, не используйте fetch.message.max.bytes в первую очередь, потому что это свойство, которое принадлежит потребителю и не входит в файл server.properties, а во-вторых, потому что оно предназначено для старой версии потребителя, вместо этого используйте max.partition.fetch.bytes при создании Consumer как часть свойств, которые вы используете для его создания.

person Nautilus    schedule 29.02.2016
comment
Я только что попробовал, но получаю тот же эффект. Файлы большого размера не принимаются. Мне интересно, отправляются ли они вообще, потому что, когда потребитель начинает читать из темы, смещения являются смежными (например, 1, 2, 3 и т. Д.). Мне кажется, что производитель может даже не отправлять большие файлы? - person Jane Wayne; 29.02.2016
comment
Оказывается, мне нужно установить и max.request.size для производителя, и max.partition.fetch.bytes для потребителя. Я немного поработаю с кодом, чтобы увидеть, действительно ли max.partition.fetch.bytes нужен. - person Jane Wayne; 29.02.2016
comment
Да, оказывается, мне нужны обе настройки. Если не выставить max.partition.fetch.bytes, то получу RecordTooLargeException. - person Jane Wayne; 29.02.2016
comment
Да, вам также понадобится max.request.size, но поскольку вы сказали мне, что отправка не является проблемой, я не обратил особого внимания на этот параметр. Вы можете принять ответ? - person Nautilus; 29.02.2016

Вам нужно увеличить серверную (как уже было описано) и клиентскую сторону.

Пример на Python с использованием kafka-python Producer:

producer = KafkaProducer(bootstrap_servers=brokers, max_request_size=1048576)

Увеличьте max_request_size до желаемого значения, по умолчанию 1048576.

person Thiago Falcao    schedule 21.05.2020

Также возможен вариант max.fetch.bytes.

person really-okay-coder    schedule 19.06.2019