Зацикливание на асинхронном разделенном агрегате в муле

Используя маршрутизатор запроса/ответа с разделителем/агрегатором коллекции, я успешно асинхронно разделил массив сообщений на его рабочий процесс, а затем использовал агрегатор, чтобы очень хорошо объединить результат.

Теперь я хочу выполнить цикл (синхронно) поверх вышеперечисленного, поэтому я использую Foreach MP или другой набор разделенных агрегатов поверх существующего (да, я сохранил эти свойства как область свойств вызова и восстановил их назад).

Я вижу, что это завершено с агрегатором для первой итерации, но входящая конечная точка виртуальной машины в маршрутизаторе запроса/ответа никогда ничего не возвращает, поэтому застряла. Я много чего перепробовал, но ничего не помогает. Есть идеи, почему?

У меня есть два массива строк sa: {11, 12, 13} и sb: {21, 22, 23} в ArrayList AL. Я хочу синхронно перебирать AL, для каждого массива String я хочу асинхронно выполнять разделение-агрегацию.

Любая помощь очень ценится.

Сули

Дэвид, спасибо.
Я поставил регистратор сразу после маршрутизатора запроса/ответа, поток не попадает в него. У меня также есть регистратор сразу после агрегатора коллекций, и он попадает в него.

Вот конфигурация XML -----------

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd ">
    <queued-asynchronous-processing-strategy name="all2thread" maxThreads="2" doc:name="Queued Asynchronous Processing Strategy"/>
    <flow name="splitertest2Flow1" doc:name="splitertest2Flow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8581" doc:name="HTTP"/>
        <expression-filter expression="#[groovy:!payload.contains('.ico')]" doc:name="Expression"/>
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";

String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <foreach doc:name="Foreach">
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">  
                <message-properties-transformer scope="outbound"> 
                    <delete-message-property key="MULE_REPLYTO"/> 
                </message-properties-transformer> 
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT"></vm:inbound-endpoint>
         </request-reply>
            <logger message="******** Almost there....." level="INFO" doc:name="Logger"/>
        </foreach>
        <logger message="**************  Very Happy to get here **********************" level="INFO" doc:name="Logger"/>
    </flow>
    <flow name="splitertest2Flow2" doc:name="splitertest2Flow2">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.IN" doc:name="VM"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <flow-ref name="DoWork2" doc:name="DoWork2"/>
    </flow>
    <flow name="DoWork2" doc:name="DoWork2" processingStrategy="all2thread">
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
    </flow>
    <flow name="splitertest2Flow3" doc:name="splitertest2Flow3" processingStrategy="all2thread">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
        <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
        <logger message="************ after aggregator  ************" level="INFO" doc:name="Logger"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.OUT" doc:name="VM"/>
    </flow>
</mule>

person Suli    schedule 30.04.2013    source источник
comment
Без тестовой конфигурации, позволяющей воспроизвести проблему, помочь вам будет практически невозможно.   -  person David Dossot    schedule 30.04.2013
comment
Дэвид, спасибо. XML-конфиг прилагается.   -  person Suli    schedule 01.05.2013


Ответы (1)


Есть две проблемы, которые мешают этому работать:

  • Элемент foreach использует один и тот же идентификатор корреляции для всех создаваемых им сообщений, что полностью нарушает нисходящий поток collection-aggregator. Агрегатор работает, группируя по этому идентификатору, и, поскольку он одинаков для шести сообщений, он не может работать. Чтобы исправить это, мне пришлось назначить новый идентификатор корреляции в качестве первого шага после foreach.
  • request-reply вычисляет «идентификатор корреляции асинхронного ответа», который необходимо использовать при отправке в очередь ответов (work.OUT). Обычно этот «идентификатор корреляции асинхронного ответа» равен идентификатору корреляции сообщения, но не в этом случае (я подозреваю, потому что мы находимся за foreach). Чтобы исправить это, мне пришлось сохранить asyncReplyCorrelationId в переменной сеанса и повторно установить его как идентификатор корреляции прямо перед отправкой в ​​очередь ответов.

Вот полный рабочий конфиг:

<queued-asynchronous-processing-strategy
    name="all2thread" maxThreads="2" />

<flow name="splitertest2Flow1">
    <http:inbound-endpoint exchange-pattern="request-response"
        host="localhost" port="8581" />
    <expression-filter expression="#[groovy:!payload.contains('.ico')]" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String [] sa = new String[3];
                sa[0]="message ... 11";
                sa[1]="message ... 12";
                sa[2]="message ... 13";

                String [] sb = new String[3];
                sb[0]="message ... 21";
                sb[1]="message ... 22";
                sb[2]="message ... 23";
                ArrayList al = new ArrayList();
                al.add(sa);
                al.add(sb);
                message.setPayload(al);
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <foreach>
        <scripting:transformer>
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[
                message.correlationId = UUID.randomUUID().toString()
                return message
             ]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">
                <message-properties-transformer
                    scope="outbound">
                    <delete-message-property key="MULE_REPLYTO" />
                </message-properties-transformer>
                <message-properties-transformer
                    scope="session">
                    <add-message-property key="asyncReplyCorrelationId"
                        value="#[message.correlationId + message.correlationSequence]" />
                </message-properties-transformer>
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT" />
        </request-reply>
        <logger message="******** Almost there....." level="INFO" />
    </foreach>
    <logger message="**************  Very Happy to get here **********************"
        level="INFO" />
</flow>

<flow name="splitertest2Flow2">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.IN" />
    <collection-splitter />
    <flow-ref name="DoWork2" />
</flow>

<flow name="DoWork2">
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String msg = message.getPayload();
                println "processing..."+msg;
                Thread.sleep(1500);
                println "exit..."+msg;
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
</flow>

<flow name="splitertest2Flow3" processingStrategy="all2thread">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
    <collection-aggregator failOnTimeout="true" />
    <logger message="************ after aggregator  ************"
        level="INFO" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                message.correlationId = asyncReplyCorrelationId
                return message
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.OUT" />
</flow>
person David Dossot    schedule 01.05.2013
comment
Давид, очень признателен. Без тебя не могу! - person Suli; 02.05.2013