Блок асинхронной отправки Spring Kafka

Я использую Spring-Kafka версии 1.2.1, и, когда сервер Kafka отключен / недоступен, на время блокируется асинхронная отправка вызовов. Вроде таймаут TCP. Код выглядит примерно так:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

Я очень быстро взглянул на код Spring-Kafka, и, похоже, он просто передает задачу клиентской библиотеке kafka, переводя взаимодействие обратного вызова на взаимодействие с будущим объектом. Глядя на клиентскую библиотеку kafka, код становится более сложным, и я не нашел времени, чтобы все это понять, но я предполагаю, что он может выполнять удаленные вызовы (по крайней мере, метаданные?) В том же потоке.

Как пользователь, я ожидал, что методы Spring-Kafka, которые возвращают будущее, вернутся немедленно, даже если удаленный сервер kafka недоступен.

Приветствуются любые подтверждения, если я неправильно понимаю или это ошибка. В итоге я сделал его асинхронным на своем конце.

Другая проблема заключается в том, что в документации Spring-Kafka вначале говорится, что она предоставляет синхронные и асинхронные методы отправки. Мне не удалось найти никаких методов, которые не возвращают фьючерсы, возможно, документация нуждается в обновлении.

Если потребуется, я с радостью предоставлю дополнительную информацию. Спасибо.


person Carlos E. L. Augusto    schedule 13.07.2017    source источник
comment
Вы смогли найти решение для этого?   -  person noobCoder    schedule 22.10.2018
comment
Неа. Я уже реализовал то, что мне нужно, на своей стороне, и так и осталось. Поскольку прошло много времени, может быть, более новые версии исправили эти проблемы? Вы пробовали 2.1.10 и 2.2.0?   -  person Carlos E. L. Augusto    schedule 22.10.2018


Ответы (5)


Если я посмотрю на сам KafkaProducer, то отправка сообщения состоит из двух частей:

  1. Сохранение сообщения во внутреннем буфере.
  2. Выгрузка сообщения из буфера в Kafka.

KafkaProducer асинхронен для второй части, а не для первой.

Метод send () все еще может быть заблокирован в первой части и в конечном итоге выбросить исключения TimeoutExceptions, например:

  • Метаданные для тем не кэшируются и не устарели, поэтому производитель пытается получить метаданные с сервера, чтобы узнать, существует ли еще тема и сколько разделов в ней.
  • Буфер заполнен (по умолчанию 32 МБ).

Если сервер полностью не отвечает, вы, вероятно, столкнетесь с обеими проблемами.

Обновлять:

Я протестировал и подтвердил это в Kafka 2.2.1. Похоже, это поведение может отличаться в версиях 2.4 и / или 2.6: KAFKA-3720

person GreyFairer    schedule 08.07.2020
comment
Ух ты, почти 3 года на точный ответ, спасибо. Я даже не использую это больше, но я действительно нашел время, чтобы посмотреть текущие документы, чтобы увидеть, было ли это описано там, но это не так. Эта документация, безусловно, нуждается в улучшении. - person Carlos E. L. Augusto; 10.07.2020
comment
Извините, я нажал клавишу ввода и отправил комментарий, а затем потребовалось более 5 минут для редактирования ... Итак, отдых: я не ожидал такого поведения, я ожидал получить какие-либо ошибки от будущего объекта при использовании метода асинхронной отправки. Возвращение будущего как в синхронной, так и в асинхронной версиях тоже не помогло ... еще раз спасибо за то, что нашли время. - person Carlos E. L. Augusto; 10.07.2020
comment
Я сам столкнулся с этой проблемой и ответил на ваш вопрос, прежде чем понял, что ему 3 года :-) - person GreyFairer; 10.07.2020
comment
Читая упомянутую вами проблему, такое поведение кажется оправданным. Однако это действительно нужно объяснить в документации. Но, как там было сказано, даже тесты ожидали другого поведения: P - person Carlos E. L. Augusto; 11.07.2020

В дополнение к аннотации @EnableAsync в классе конфигурации необходимо использовать аннотацию @Async в методе, в котором вы вызывали этот код.

http://www.baeldung.com/spring-async

Вот некоторые фрагменты кода. Конфигурация производителя Kafka:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

И сам производитель:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}
person Jorge C    schedule 14.07.2017
comment
Благодарю за ваш ответ. Нет, я не использую эти аннотации, в документации о них ничего не говорилось. Я попробую оба и дам знать, решит ли это проблему. - person Carlos E. L. Augusto; 17.07.2017
comment
К сожалению, использование EnableAsync ничего не изменило. Кроме того, из ссылки я понимаю, что это библиотека spring-kafka, которая должна использовать аннотацию Async, поскольку она предоставляет мне будущий объект. - person Carlos E. L. Augusto; 18.07.2017
comment
Я согласен с вами, для меня не имеет смысла предоставлять фьючерсы, но я все равно должен разместить аннотации. В нашем случае размещение этих двух аннотаций заставило его работать как шарм. Отредактирую ответ, добавив фрагменты кода. - person Jorge C; 19.07.2017
comment
Насколько я понимаю, ваши фрагменты кода состоят в том, что вы сделали свой собственный код асинхронным, используя для этого весенний способ. Это работает для вас, потому что метод send возвращает void, поэтому Spring выполняет свое содержимое внутри нового потока и немедленно возвращается к вызывающей стороне send. Однако выполняемые вами вызовы spring-kafka остаются синхронными. Ваш код аналогичен тому, что я сделал в своем коде для решения проблемы, но я использовал простую Java, потому что мне нужно было больше контроля. И с вашими, и с моими решениями я ожидал бы, что поток выполнения изменит потоки 2 раза, если spring-kafka сделал это правильно. - person Carlos E. L. Augusto; 19.07.2017

Лучшее решение - добавить прослушиватель обратного вызова на уровне производителя.

@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
    KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
    kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {

        @Override
        public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
            System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = " 
                    + recordMetadata.partition()  +" with offset= " + recordMetadata.offset()
                    + " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
        }

        @Override
        public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
            System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
            exception.printStackTrace();
        }
    });
    return kt;
}
person Arpit Gupta    schedule 25.02.2020

Просто чтобы убедиться. Применена ли аннотация @EnableAsync? Я хочу сказать, что это может быть ключом к определению поведения Future ‹>

person Chad    schedule 13.07.2017
comment
Благодарю за ваш ответ. Нет, я не использую эту аннотацию, ничего об этом не было в документации. Я попробую и дам знать, решит ли это проблему. - person Carlos E. L. Augusto; 17.07.2017
comment
Использование @EnableAsync, к сожалению, ничего не изменило = / - person Carlos E. L. Augusto; 18.07.2017

Код ниже работает для меня, чтобы получить ответ асинхронно

    ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
    Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
    new Thread(runnable).start();

  public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {

    @Override
    public void onFailure(Throwable exception) {
      log.error("unable to send message: " + exception.getMessage());
     }

     @Override
     public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
       log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
     }
  }

   public class SendResult<K, V> {

     private final ProducerRecord<K, V> producerRecord;

     private final RecordMetadata recordMetadata;

     public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        this.producerRecord = producerRecord;
        this.recordMetadata = recordMetadata;
    }

    public ProducerRecord<K, V> getProducerRecord() {
       return this.producerRecord;
    }

    public RecordMetadata getRecordMetadata() {
       return this.recordMetadata;
    }

    @Override
    public String toString() {
       return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
    }

 }
person AnupKurup    schedule 26.04.2020
comment
Ну, это, конечно, работает, но вы просто делаете это асинхронным самостоятельно, что было именно тем, что я сделал тогда и сказал в исходном вопросе. Тогда я хотел знать, была ли это ошибка в spring kafka или я что-то упустил ... - person Carlos E. L. Augusto; 09.07.2020
comment
@ CarlosE.L.Augusto: расскажите, пожалуйста, о том, что вы сделали для решения своей проблемы с помощью кода. - person Praveen kumar singhal; 21.06.2021