Асинхронный компонент Camel - немедленно вызывается doStop()

Я пытаюсь создать компонент верблюда, который использует API из внешней службы.

Мой маршрут выглядит следующим образом

from("myComponent:entity?from=&to=")
.to("seda:one")

from("seda:one")
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.process( new Processor1() )
to("seda:two")

.
.
.


from("seda:five")
.to("myComponent2:entity")

Я реализовал своего потребителя компонентов следующим образом

public class MyComponentConsumer extends DefaultConsumer {

    public MyComponentConsumer(MyComponentEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        flag = true;
        while ( flag ) {
            //external API call
            Resource resource = getNextResource();
            if ( resource.next() == null ) {
                flag = false;
            }
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(resource.toString());
            getAsyncProcessor().process(
                            ex
                            doneSync -> {
                                LOG.info("Message processed");
                            }
                    );
        }
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        System.out.println("stop ---- ");
    }
}

Все работало нормально, и данные распространялись по маршруту. Моя единственная проблема заключалась в том, что данные не распространялись на следующую часть, пока весь этот процесс не был завершен. А следующие части запускались асинхронно.

Я посмотрел на пример StreamConsumer и попытался реализовать его в своем коде, используя runnable и executorService. Но если я это сделаю, потребитель остановится, как только начнется.

Я изменил код на

public class MyComponentConsumer extends DefaultConsumer implements Runnable 

и добавил

private ExecutorService executor;
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "myComponent");
executor.execute(this);

и переместил мою логику внутрь метода run(). Но потребительский поток заканчивается, как только начинается. и асинхронный процессор не передает данные должным образом.

Есть ли другой способ реализовать нужный мне функционал или я где-то здесь ошибаюсь. Любая помощь будет оценена по достоинству.


person Shark Man    schedule 12.09.2020    source источник


Ответы (1)


Какую версию верблюда вы используете?

Возникла проблема с управлением состоянием потребителя в версии camel 2.x, которая была исправлена ​​в версии Camel 3.x CAMEL-12765, что может привести к описанной здесь проблеме.

Если вы используете версию camel 2.x, попробуйте использовать newScheduledThreadPool вместо newSingleThreadExecutor. Также executor.schedule(this, 5L, TimeUnit.SECONDS) вместо executor.execute(this).

Отложенный запуск исполнителя может помочь избежать проблемы, с которой вы столкнулись.

person Adithya Rao    schedule 16.09.2020