Кажется, я борюсь с этим шаблоном в Beam. Это потоковый конвейер.
На высоком уровне:
- сообщение приходит кролику
- содержимое сообщения включает ID и пути к файлу N S3
- Я хочу произвести некоторую агрегацию по всем перечисленным файлам S3, но результаты должны быть помечены исходным сообщением.
- напишите сообщение обратно кролику с результатом агрегации, по одному для каждого входящего сообщения
Неизбежно, я получаю несколько PCollection[KV[MessageId, S3FilePaths]]
и хочу применить кучу PTransform
на S3FilePaths
, но не теряю того факта, что они изначально были введены MessageId
.
Кажется, я не могу найти общую «сопоставить значения KV pcollection, но сохранить ключевую» функциональность, и я считаю, что должен использовать PTransform (в отличие от DoFn
), потому что файл IO все было реализовано как PTransforms.
Неправильно ли я думаю об этом? Любая помощь горячо приветствуется.
Обновление. Приносим извинения за то, что не вдавались в подробности. Я сам виноват в том, что разместил это в конце разочаровывающей пятницы.
У меня было несколько основных препятствий:
- Я понял, что
PCollection[KV
действительно для объединения уже загруженных данных. Попытка изолировать каждыйV
как отдельный набор операций конвейера на самом деле не соответствовала API - У меня не было настроено глобальное управление окнами / триггерами для поставленной задачи. Более того, мои преобразования не всегда сохраняли семантику окна / панели, которую я предполагал.
- У меня есть разные пути к файлам s3 для каждого сообщения, но из-за таких проблем, как https://issues.apache.org/jira/browse/BEAM-7753,
FileIO
API ориентированы наPTransform
s, что не позволяет мне легко помечать результаты с помощью идентификатора входящего сообщения и не может создать экземплярReadableFile
напрямую (это package-private) Я не мог собрать его воедино. Я закончил тем, что обернул клиент S3 для java внутри пользовательскогоPTransform
, который сохраняет исходныйMessageId
вместе с каждым возвращаемым значением.
На данный момент у меня что-то работает непрерывно. Мой код на самом деле написан на scio, поэтому поделиться им немного сложно, но на высоком уровне:
- чтение из RabbitMQ с использованием
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
триггера в фиксированном 1-секундном окне и соблюдение осторожности при использованииProcessContext.output
для сохранения окон и временных меток на всем протяжении - Использование общей формы
PCollection[(MessageId, V)]
(синтаксис scalaTuple2
) повсюду. КогдаV
- это путь к файлу S3, я используюPTransform
из пути, испускающего содержимое файла (это неподдерживаемый формат данных). - Агрегация выполняется после группировки до
PCollection[KV[(MessageId, FileElementId), FileElement]]
, а затем уменьшается доPCollection[MessageId, FileElement]
, так что семантика сокращения для каждого входящего сообщения сохраняется.
Номер 2 меня немного разочаровал. Я надеялся, что смогу использовать функции файловой системы луча для чтения из файла и объединения каждого вывода с идентификатором сообщения, в котором он был указан. Но сейчас я нахожусь в хорошем положении.