У меня есть источник AMQP и приемник AMQP с объявлениями:
List<Declaration> declarations = new ArrayList<Declaration>() {{
add(QueueDeclaration.create(sourceExchangeName));
add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
}};
amqpSource = AmqpSource
.committableSource(
NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
.withDeclarations(declarations),
bufferSize);
AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
.withExchange("DEST_XCHANGE")
.withRoutingKey("ROUTE123")
.withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
BuiltinExchangeType.DIRECT.getType()));
amqpSink = AmqpSink.create(amqpWriteSettings);
А то у меня течет..
amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)
Теперь, после того как я запустил приложение, сообщения, отправленные в исходную очередь, не потреблялись. Позже я узнал, что это произошло из-за ошибок, которые возникали при декларациях. (т. е. он работал нормально, когда я удалил .withDeclarations(..) в настройках источника и приемника.
Итак, мои вопросы:
- Как определить, что источник и приемник AMQP работают нормально?
- Как игнорировать исключения объявлений?
- Если возникает какое-либо исключение, как я могу узнать об этом и вызвать сбой системы?