Асинхронная неблокирующая многоадресная рассылка с Camel

У меня есть два вопроса: 1. Можем ли мы иметь неблокирующие асинхронные маршруты в верблюде. Я вижу асинхронность с seda, но тогда, если разгрузка работает на другой поток, который блокируется. 2. Если да, то можем ли мы использовать многоадресную рассылку на таких маршрутах. Ниже приведен мой многошаговый маршрут на верблюдах, который, кажется, работает. Но не уверен, что это асинхронный или неблокирующий асинхронный режим.

from("direct:multiStep")
             .to("bean:routeHandler?method=monoReturningMethod1")
             .process(new UnwrapStreamProcessor())
             .to("bean:routeHandler?method=monoReturningMethod2")
             .process(new UnwrapStreamProcessor())

Выше работает, и веб-запрос имеет ответ как от monoReturningMethod. В этом случае я хочу убедиться, что весь процесс не блокируется.

Для многоадресной рассылки я экспериментирую со следующим маршрутом. Не знаю, куда поставить UnwrapStreamProcessor. Пробовал ставить после end() не получается. Нужен ли мне пользовательский Processor ? Или как связать все возвраты Mono в один?

from("direct:incoming")
         .multicast()
         .parallelProcessing()
         .to("bean:routeHandler?method=monoReturningMethod1", "bean:routeHandler?method=monoReturningMethod2")
         .end()

Я использую apache `camel 3.0.1 со стартером весенней загрузки.

@Component("routeHandler")
public class RouteHandler {
     Mono<Entity> monoReturningMethod1(Exchange exchange) {
          //make some WebClient request which returns Mono.
     }
     Mono<Entity> monoReturningMethod2(Exchange exchange) {
          //make some WebClient request which returns Mono.
     }
}

Этот маршрут обрабатывает входящий веб-запрос. Как сделать всю обработку маршрута неблокирующей и асинхронной. Я пытался использовать process(new UnwrapStreamProcessor()) в качестве шага процесса после monoReturningMehtod, и если я делаю последовательно, это работает. Но он не работает с многоадресной рассылкой и не позволяет перезаписывать исходное сообщение. Какие-либо предложения ?

PS: я запускаю свой асинхронный поток следующим образом: producerTemplate.asyncSend("RouteName", exchange)


person Vishal Kumar    schedule 24.03.2020    source источник


Ответы (1)


Согласно этой странице, все EIP поддерживаются механизмом асинхронной маршрутизации, но не все типы конечных точек (компоненты). Некоторые компоненты имеют только ограниченную поддержку, т. е. они могут потреблять или производить только асинхронные данные.

Вы можете использовать Threads DSL в своем маршруте, чтобы сообщить Camel, что с точки пересылаются сообщения асинхронно в новом потоке.

person burki    schedule 30.03.2020