Поиск одновременных обменов в качестве входных данных для Apache Camel Splitter

Я реализую маршрут для чтения строк из базы данных, разделяю их с помощью сплиттера, обрабатываю их параллельно, агрегирую и обновляю базу данных. Все работает так, как ожидалось, когда есть только один вход для маршрута разветвителя. Образец кода-

    <route>
        <from uri="direct:splitter"/>
        <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
        <split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
            <simple>${body}</simple>
            <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
            <to uri="bean:gisResponseProcessor" />
        </split>
    </route>

Когда я отправляю 3 сообщения в direct:splitter (обработка каждого занимает несколько минут), и все они обрабатываются параллельно. Когда я пытаюсь это сделать, первые сообщения журнала вне разделителя для всех трех входов печатаются немедленно. Однако сообщения журнала из внутреннего сплиттера показывают, что каждый из 3 обменов разделяется один за другим. Подсообщения каждого из них используют пул потоков. Есть ли способ заставить сплиттер разделить 3 входа параллельно?


person M J    schedule 16.04.2014    source источник


Ответы (2)


Да, используя маршрут seda или маршрут vm. Оба они делают одно и то же, но имеют немного другое применение. Я бы посоветовал вам увидеть этот мой ответ о различиях: параллельная обработка верблюжьего сплиттера.

Проблема, с которой вы сталкиваетесь, заключается в том, что вы хотите отправлять сообщения на сплиттер параллельно. Таким образом, сообщения 1,2 и 3 обрабатываются параллельно. Этого можно добиться с помощью компонента seda. Компонент seda является асинхронным и позволяет обрабатывать сообщения на маршруте параллельно.

Попробуйте этот маршрут:

<route>
    <from uri="direct:splitter"/>
    <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}" />
     <to uri="seda:sedaSplitter"/>
</route>
<route id="sedaSplitter>
    <from uri="seda:sedaSplitter?multipleConsumers=true&amp;concurrentConsumers=16"/>

      <split strategyRef="aggregatorStrategy" executorServiceRef="myPool">
          <simple>${body}</simple>
          <log message="batch id- $simple{header.BATCH_NUMBER}, loop index - $simple{property.CamelLoopIndex}, split index - $simple{property.CamelSplitIndex}" />
          <to uri="bean:gisResponseProcessor" />
      </split>
</route>

Обратите внимание, что сообщения обрабатываются по компонентному маршруту seda, где MESSAGE обрабатывается параллельно. Вы можете сделать это еще более параллельным, включив parallelProcessing="true" в разделитель, чтобы отдельные элементы в разделении обрабатывались параллельно.

Крикните, если вам нужна дополнительная информация.

person Namphibian    schedule 16.04.2014
comment
При установке executorServiceRef автоматически подразумевается параллельная обработка, и вам не нужно включать эту опцию. - person G Quintana; 17.04.2014
comment
Я попробовал предложенный вами маршрут и все еще вижу похожие сообщения в журнале. Мне кажется параллельно до сплиттера. Разделитель обрабатывает только один входной обмен (разделенный на множество подсообщений, которые обрабатываются параллельно) за раз. Нужно ли мне настраивать несколько маршрутов, таких как sedaSplitter, чтобы получить параллельное потребление нескольких входов для сплиттера? - person M J; 18.04.2014

Убедитесь, что в вашем пуле потоков с именем myPool доступно как минимум 6 потоков.

person G Quintana    schedule 17.04.2014
comment
Этот пул используется совместно с другими маршрутами? - person G Quintana; 18.04.2014
comment
Не общий бассейн. Я обновлю вопрос полной конфигурацией верблюда, чтобы он был более понятным. - person M J; 18.04.2014