Я использую облачный поток Spring, и я хотел бы сохранить сообщения и повторить попытку опубликовать их в теме, когда сервер Kafka ушел, но метод send () MessageChannel всегда возвращает true, даже если сервер Kafka / Zookeeper остановлен.
Кто-нибудь может помочь?
ОБНОВЛЕНИЕ с содержанием application.yml:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json
КОД:
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);
@Autowired
Source source;
@Autowired
NotificationFileService notificationFileService;
public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);
if (!resp){
log.info(" ======== failed to send the notification" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
}
}).get();
} catch (InterruptedException | ExecutionException e) {
log.info(" ======== failed to send the notification with exception" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
e.printStackTrace();
}
}
}