Beam применяет PTransform к значениям с сохранением ключа

Кажется, я борюсь с этим шаблоном в Beam. Это потоковый конвейер.

На высоком уровне:

  • сообщение приходит кролику
  • содержимое сообщения включает ID и пути к файлу N S3
  • Я хочу произвести некоторую агрегацию по всем перечисленным файлам S3, но результаты должны быть помечены исходным сообщением.
  • напишите сообщение обратно кролику с результатом агрегации, по одному для каждого входящего сообщения

Неизбежно, я получаю несколько PCollection[KV[MessageId, S3FilePaths]] и хочу применить кучу PTransform на S3FilePaths, но не теряю того факта, что они изначально были введены MessageId.

Кажется, я не могу найти общую «сопоставить значения KV pcollection, но сохранить ключевую» функциональность, и я считаю, что должен использовать PTransform (в отличие от DoFn), потому что файл IO все было реализовано как PTransforms.

Неправильно ли я думаю об этом? Любая помощь горячо приветствуется.


Обновление. Приносим извинения за то, что не вдавались в подробности. Я сам виноват в том, что разместил это в конце разочаровывающей пятницы.

У меня было несколько основных препятствий:

  1. Я понял, что PCollection[KV действительно для объединения уже загруженных данных. Попытка изолировать каждый V как отдельный набор операций конвейера на самом деле не соответствовала API
  2. У меня не было настроено глобальное управление окнами / триггерами для поставленной задачи. Более того, мои преобразования не всегда сохраняли семантику окна / панели, которую я предполагал.
  3. У меня есть разные пути к файлам s3 для каждого сообщения, но из-за таких проблем, как https://issues.apache.org/jira/browse/BEAM-7753, FileIO API ориентированы на PTransforms, что не позволяет мне легко помечать результаты с помощью идентификатора входящего сообщения и не может создать экземпляр ReadableFile напрямую (это package-private) Я не мог собрать его воедино. Я закончил тем, что обернул клиент S3 для java внутри пользовательского PTransform, который сохраняет исходный MessageId вместе с каждым возвращаемым значением.

На данный момент у меня что-то работает непрерывно. Мой код на самом деле написан на scio, поэтому поделиться им немного сложно, но на высоком уровне:

  1. чтение из RabbitMQ с использованием Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()) триггера в фиксированном 1-секундном окне и соблюдение осторожности при использовании ProcessContext.output для сохранения окон и временных меток на всем протяжении
  2. Использование общей формы PCollection[(MessageId, V)] (синтаксис scala Tuple2) повсюду. Когда V - это путь к файлу S3, я использую PTransform из пути, испускающего содержимое файла (это неподдерживаемый формат данных).
  3. Агрегация выполняется после группировки до PCollection[KV[(MessageId, FileElementId), FileElement]], а затем уменьшается до PCollection[MessageId, FileElement], так что семантика сокращения для каждого входящего сообщения сохраняется.

Номер 2 меня немного разочаровал. Я надеялся, что смогу использовать функции файловой системы луча для чтения из файла и объединения каждого вывода с идентификатором сообщения, в котором он был указан. Но сейчас я нахожусь в хорошем положении.


person drobert    schedule 16.08.2019    source источник
comment
Привет, не могли бы вы отредактировать свой пост с помощью кода конвейера? Это очень поможет.   -  person Cubez    schedule 19.08.2019


Ответы (1)


Невозможно применить преобразования к KV<KeyT, ValueT>, если они принимают только KeyT или ValueT. Если вам нужно сохранить ключ при применении преобразований к значению, то рекомендуется либо написать свой собственный DoFns, который может принимать KV, но игнорировать ключ, либо реструктурировать конвейер так, чтобы вы не полагались на вывод преобразования, требующие отбрасывания ключей.

person Daniel Oliveira    schedule 19.08.2019