Разделение Camel ZipFileDataFormat не устанавливает заголовки при потоковой передаче

У меня есть простой маршрут, который опрашивает zip-файлы с FTP-сервера. ZIP-файл состоит из одного файла, требующего обработки, и нескольких вложений. Я пытаюсь использовать ZipFileDataFormat для разделения, и я могу разделить и направить элементы по желанию, т.е. отправить файл обработки в процессор, а другие файлы в конечную точку агрегатора.

Маршрут выглядит следующим образом:

from(sftp://username@server/folder/path?password=password&delay=600000)
.unmarshal(getZipFileDataFormat()).split(body(Iterator.class)).streaming()
.log("CamelSplitComplete :: ${header.CamelSplitComplete}")
.log("Split Size :: ${header.CamelSplitSize}")
 .choice()
   .when(header(MyConstants.CAMEL_FILE_NAME_HEADER).contains(".json"))
     .to(JSON_ENDPOINT).endChoice()
   .otherwise()
     .to(AGGREGATOR_ENDPOINT)
 .endChoice()
.end();

getZipFileDataFormat

private ZipFileDataFormat getZipFileDataFormat() {
        ZipFileDataFormat zipFile = new ZipFileDataFormat();
        zipFile.setUsingIterator(true);
        return zipFile;
    }

Разделение работает нормально. Однако в журналах я вижу, что два заголовка CamelSplitComplete и CamelSplitSize установлены неправильно. Там, где CamelSplitComplete всегда ложно, CamelSplitSize не имеет никакого значения.

Из-за этого я не могу агрегировать на основе размера. Я использую eagerCheckCompletion() для получения входного обмена на маршруте агрегатора. Маршрут моего агрегатора выглядит следующим образом.

from(AGGREGATOR_ENDPOINT).aggregate(new ZipAggregationStrategy()).constant(true)
.eagerCheckCompletion().completionSize(header("CamelSplitSize"))to("file:///tmp/").end();

Я прочитал документацию Apache, что эти заголовки всегда установлены. Я что-то пропустил здесь? Любой указатель в правильном направлении был бы очень полезен.


person Kabira Speaking    schedule 29.06.2017    source источник
comment
Нет CamelSplitSize, когда вы используете потоковый режим на разветвителе, как задокументировано. Однако завершенное должно быть там.   -  person Claus Ibsen    schedule 29.06.2017
comment
Спасибо @КлаусИбсен! Есть ли способ определить количество файлов в zip? Можем ли мы использовать CamelSplitIndex - 1 для определения количества файлов?   -  person Kabira Speaking    schedule 29.06.2017
comment
@ClausIbsen, в документации также говорится, что Начиная с версии Camel 2.9, этот заголовок также устанавливается при разделении на основе потока, но только на завершенном Exchange. Я использую 2.12.5.   -  person Kabira Speaking    schedule 29.06.2017
comment
Попробуйте использовать более новую версию, так как 2.12.x очень старая.   -  person Claus Ibsen    schedule 29.06.2017
comment
Я вручную устанавливаю заголовки для корреляции и размера завершения. Теперь я получаю правильные значения в маршруте агрегатора. Однако теперь zip-файл не создается. Обмен проходит без ошибок. Маршрут агрегатора теперь выглядит так. from(AGGREGATOR_ENDPOINT).log("Aggregated File Count :: ${header.totalFileCount}").log("Incoming File Name :: ${header.incomingFileName}").aggregate(header("incomingFileName"), new ZipAggregationStrategy()).completionSize(header("totalFileCount")).to("file:///tmp/").end(); Любые указатели на то, почему не создается zip-файл?   -  person Kabira Speaking    schedule 30.06.2017
comment
@ClausIbsen - я работаю с Websphere 7, и лучшая версия верблюда, которую я могу использовать, - 2.13. Есть ли у вас какие-либо указатели на маршрут агрегатора выше?   -  person Kabira Speaking    schedule 30.06.2017


Ответы (1)


Я смог получить весь маршрут на работу. Мне пришлось добавить своего рода препроцессор, который установил бы некоторые важные заголовки (имя исходящего файла и количество файлов в zip), которые мне потребуются для агрегации.

from(sftp://username@server/folder/path?password=password&delay=600000).to("file:///tmp/")
.beanRef("headerProcessor").unmarshal(getZipFileDataFormat())
.split(body(Iterator.class)).streaming()    
 .choice()
   .when(header(Exchange.FILE_NAME).contains(".json"))
     .to(JSON_ENDPOINT).endChoice()
   .otherwise()
    .to(AGGREGATOR_ENDPOINT)
 .endChoice()
.end();

После этого стратегия агрегации zip заработала как положено. Размещение здесь маршрута агрегации только для завершения ответа.

from(AGGREGATOR_ENDPOINT)
  .aggregate(header(MyConstants.HEADER_OUTGOING_FILE_NAME), new ZipAggregationStrategy())
  .eagerCheckCompletion().completionSize(header(MyConstants.HEADER_TOTAL_FILE_COUNT))
  .setHeader(Exchange.FILE_NAME, simple("${header.outgoingFileName}"))
 .to("file:///tmp/").end();
person Kabira Speaking    schedule 11.07.2017
comment
Кабира, я пытаюсь добиться чего-то очень похожего. Однако каждый файл в сгенерированном выходном zip-архиве имеет идентификаторы обменов, а не имена файлов. Я использую XML DSL, поэтому для установки имен файлов я использую ‹setHeader headerName=Exchange.FILE_NAME›‹simple›${header.CamelFileName}‹/simple›‹/setHeader›. Это делается перед вызовом агрегатора, редактируя заголовок. CamelFileName, чтобы выходной почтовый индекс назывался со значением этого заголовка. Однако имена внутренних файлов соответствуют их идентификатору обмена. Есть идеи? - person Fede E.; 03.05.2019