Использование defaultNaming для динамической оконной записи в Apache Beam

Я следую вместе с ответом на этот сообщение и документация для выполнения динамической оконной записи моих данных в конце конвейера. Вот что у меня есть до сих пор:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

Но NetBeans предупреждает меня о синтаксической ошибке в последней строке:

FileNaming is not public in Write; cannot be accessed outside package

Как сделать defaultNaming доступным для моего конвейера, чтобы я мог использовать его для динамической записи. Или, если это невозможно, что я должен делать вместо этого?


person ljhennessy    schedule 07.05.2018    source источник
comment
Это не похоже на проблему луча/потока данных, а скорее на проблему Java. Этот ответ в более старом вопросе предлагает объяснение того, почему java выдает этот тип ошибки и это возможное решение. Кто-нибудь из них помогает?   -  person Lefteris S    schedule 08.05.2018
comment
Я согласен, что это проблема пути Java. Тем не менее, я ищу некоторую помощь в контексте Beam при использовании этого метода defaultNaming. Сообщение, на которое я ссылаюсь, показывает использование этого метода аналогично тому, как я его использую, но, по-видимому, он не выдает ту же ошибку. Мне интересно, почему бы и нет.   -  person ljhennessy    schedule 08.05.2018


Ответы (1)


Публикация того, что я понял, на случай, если кто-то еще столкнется с этим.

Было три проблемы с тем, как я пытался использовать writeDynamic() раньше.

  1. Раньше я использовал Beam версии 2.3.0, которая действительно описывает FileNaming как внутренний класс по отношению к FileIO.Write. Beam 2.4.0 определяет FileNaming как public static interface, что делает его доступным извне.
  2. Полное разрешение/импорт defaultNaming. Вместо того, чтобы вызывать defaultNaming напрямую, как это называется в документации по примерам, его нужно вызывать как FileIO.Write.defaultNaming, поскольку FileIO — это пакет, который я фактически импортировал.
  3. Добавление withDestinationCoder также требовалось для выполнения динамической записи.

Окончательное решение выглядело так.

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

Где Event::getKey — это статическая функция, определенная в том же пакете с сигнатурой public static String getKey(String event).

Это выполняет оконную запись, которая будет записывать один файл в каждом окне (как определено методом .withNumShards(1)). Это предполагает, что окно было определено на предыдущем шаге. GroupByKey не требуется перед записью, так как это делается в процессе записи всякий раз, когда количество осколков определяется явно. См. документацию FileIO подробнее в разделе «Запись файлов -> Сколько осколков генерируется на панель».

person ljhennessy    schedule 08.05.2018