Spring Cloud Stream MessageChannel send () всегда возвращает true

Я использую облачный поток Spring, и я хотел бы сохранить сообщения и повторить попытку опубликовать их в теме, когда сервер Kafka ушел, но метод send () MessageChannel всегда возвращает true, даже если сервер Kafka / Zookeeper остановлен.

Кто-нибудь может помочь?

ОБНОВЛЕНИЕ с содержанием application.yml:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                    mode: raw
                bindings:
                    output:
                        producer:
                            sync: true
            bindings:
                output:
                    destination: topic-notification
                    content-type: application/json

КОД:

@Service
public class SendToKafka {
    private Logger log = LoggerFactory.getLogger(SendToKafka.class);

    @Autowired
    Source source;

    @Autowired
    NotificationFileService notificationFileService;

    public void send(NotificationToResendDTO notification){
        try {
            CompletableFuture.supplyAsync(() -> notification)
                .thenAcceptAsync(notif -> {
                    boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
                    log.info(" ======== kafka server response === " + resp);

                    if (!resp){
                        log.info(" ======== failed to send the notification" + notification);
                        // save failed notification
                        notificationFileService.writeTofile(notification);
                    }
                }).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info(" ======== failed to send the notification with exception" + notification);
            // save failed notification
            notificationFileService.writeTofile(notification);
            e.printStackTrace();
        }
    }
}

person freemanpolys    schedule 06.12.2017    source источник


Ответы (1)