как использовать Net :: Stomp и транзакции с receive_frame

Я нахожусь в процессе адаптации некоторого существующего кода с помощью Net :: Stomp от возможности обрабатывать одну тему до возможности работать над несколькими темами. Может ли кто-нибудь сказать мне, возможен ли вообще такой подход? Сейчас он не работает, потому что там, где он ожидает получения транзакции, он получает первое сообщение по другой теме. Я хотел бы знать, не лаю ли я не на то дерево, прежде чем пытаться его починить.

Вот как выглядит рабочий процесс:

# first subscribe to three different queues
for $job (qw/ JOB1 JOB2 JOB3 /){
$stomp->subscribe({
   "ack" => "client",
   "destination" => "/queue/$job"
});

# listen on those three channels...
while($stomp->can_read){

   $frame = $stomp->receive_frame();

   # ... receives a message for JOB1
   # and to start a transaction send a BEGIN frame that looks like this:

    bless({
    command => "BEGIN",
    headers => {
             receipt => "0002",
            transaction => "0001",
       },
    }, "Net::Stomp::Frame")

   # Then looks for a receipt on that frame by calling
   $receipt = $stomp->receive_frame()

К сожалению, там, где он ожидает фрейм RECEIPT, он фактически получает следующий фрейм MESSAGE, ожидающий в очереди JOB2.

Мой вопрос: есть ли способ, чтобы это работало, чтобы быть одновременно подписанным на несколько тем и иметь возможность получать квитанции о транзакциях? Или есть лучший / более стандартный способ справиться с этим?

Любые советы или предложения будут приветствоваться, спасибо! Я также отправляю этот вопрос в список ActiveMQ, надеюсь, что все в порядке: - /

* обновить *

Вот полный пример:

use Net::Stomp;

use strict;

my $stomp = Net::Stomp->new( { hostname => 'bpdeb', port => '61612' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );

# pre-populate the two queues
$stomp->send( { destination => '/queue/FOO.BAR', body => 'test message' } );
$stomp->send( { destination => '/queue/FOO.BAR2', body => 'test message' } );


# now subscribe to them
$stomp->subscribe({ destination => '/queue/FOO.BAR',
                   'ack'        => 'client',
                   'activemq.prefetchSize' => 1
});
$stomp->subscribe({ destination => '/queue/FOO.BAR2',
                   'ack'        => 'client',
                   'activemq.prefetchSize' => 1
});

# read one frame, then start a transaction asking for a receipt of the 
# BEGIN message
while ($stomp->can_read()){

    my $frame = $stomp->receive_frame; 
    print STDERR "got frame ".$frame->as_string()."\n";


    print STDERR "sending a BEGIN\n";
    my($frame) = Net::Stomp::Frame->new({
        command => 'BEGIN',
            headers => {
            transaction => 123,
            receipt     => 456,
        },
    });

    $stomp->send_frame($frame);

    my $expected_receipt = $stomp->receive_frame;
    print STDERR "expected RECEIPT but got ".$expected_receipt->as_string()."\n";

    exit;
}

Это выводит (с опущенными деталями)

got frame MESSAGE
destination:/queue/FOO.BAR
....

sending a BEGIN

expected RECEIPT but got MESSAGE
destination:/queue/FOO.BAR2
....

Глядя на сетевой трафик, как только отправляется запрос SUBSCRIBE, первое сообщение в очереди передается клиенту по сети. Итак, первое сообщение от FOO.BAR2 уже ожидает в сетевом буфере клиента, когда я отправляю сообщение BEGIN, и клиент читает FOO.BAR2 прямо из своего буфера.

Так что либо я что-то делаю не так, либо так не может работать.


person Kevin G.    schedule 26.09.2011    source источник


Ответы (1)


Хорошо, я попробовал, и он отлично работает. Но вы тот, кто получает фрейм. Так почему же сервер должен отправлять вам фрейм квитанции?

Вы устанавливаете "ack" => "client", а это означает, что сервер будет считать фрейм «недоставленным», пока вы не скажете иначе. Просто измените строку $receipt = $stomp->receive_frame() на $stomp->ack( { frame => $frame } );.

Обновлять

Ах, хорошо, вы хотите защитить ack с помощью транзакции. Итак, давайте посмотрим на исходный код: Существует метод send_transactional, который, вероятно, делает то, что вы хотите (но он использует фрейм SEND вместо ACK).

Возможно, вам также стоит взглянуть на патч, отправленный от cloudmark , который добавляет в модуль несколько «функций безопасности» (к сожалению, автор модуля ничего не сказал о слиянии этого патча, когда я его спросил).

person Egga Hartung    schedule 27.09.2011
comment
Ян, спасибо, что посмотрели на него, было полезно знать, что он должен работать. Итак, я провел дополнительное исследование и добавил к вопросу более полный пример повторения. НАПИШИТЕ свой комментарий об ACK, я считаю, что целью исходного кода было поместить ACK внутри транзакции, поэтому я хочу получить квитанцию ​​от BEGIN, прежде чем отправлять ACK. Если я не понимаю, что вы говорите ... - person Kevin G.; 28.09.2011