Alpakka AMQP: как обнаружить исключение объявления?

У меня есть источник AMQP и приемник AMQP с объявлениями:

List<Declaration> declarations = new ArrayList<Declaration>() {{
      add(QueueDeclaration.create(sourceExchangeName));
      add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
    }};

amqpSource = AmqpSource
    .committableSource(
        NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
            .withDeclarations(declarations),
        bufferSize);

AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
    .withExchange("DEST_XCHANGE")
    .withRoutingKey("ROUTE123")
    .withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
        BuiltinExchangeType.DIRECT.getType()));

amqpSink = AmqpSink.create(amqpWriteSettings);

А то у меня течет..

amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)

Теперь, после того как я запустил приложение, сообщения, отправленные в исходную очередь, не потреблялись. Позже я узнал, что это произошло из-за ошибок, которые возникали при декларациях. (т. е. он работал нормально, когда я удалил .withDeclarations(..) в настройках источника и приемника.

Итак, мои вопросы:

  1. Как определить, что источник и приемник AMQP работают нормально?
  2. Как игнорировать исключения объявлений?
  3. Если возникает какое-либо исключение, как я могу узнать об этом и вызвать сбой системы?

person Jerald Baker    schedule 12.05.2020    source источник


Ответы (1)


Чтобы ответить на 1 и 3, AmqpSink материализует CompletionStage<Done>, который вам придется сохранить и обработать (зарегистрировать некоторые функции обратного вызова), чтобы наблюдать за сбоем и завершением потока. В примере документации мы блокируем этап завершения, который не подходит для производственного кода (https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink), вероятно, потому, что образец включен в один из тестов Alpakka. Вместо этого отдайте предпочтение обычным CompletionStage методам обратного вызова/преобразования (см., например, это введение).

CompletionStage выйдет из строя, когда произойдет ошибка, когда поток материализуется/запускается или во время обработки элементов, или завершается, когда источник достигает конца и каждый элемент проходит через ваш поток в приемник. Это означает, что для запуска потока, если он не выходит из строя довольно быстро, он работает.

На вопрос 2 не уверен, можно ли игнорировать исключения объявления, возможно, они всегда терпят неудачу в соединении.

person johanandren    schedule 15.05.2020
comment
когда я использую AmqpFlow для записи, а затем делаю еще несколько сопоставлений ... и в конце я делаю Sink.Ignore. Как я могу обнаружить сбои в этом случае? - person Jerald Baker; 18.05.2020
comment
Сбой в потоке пойдет вниз по течению и в конечном итоге окажется в материализованном значении Sink.ignore, так что просто держитесь за это. - person johanandren; 19.05.2020
comment
Спасибо. Таким образом, это означает, что когда этап завершения пара завершится, поток завершится и больше не будет работать. спасибо! - person Jerald Baker; 20.05.2020