Google Dataflow создает только одного рабочего для большого файла .bz2

Я пытаюсь обработать json-дамп Wikidata с помощью Cloud Dataflow.

Я скачал файл с https://dumps.wikimedia.org/wikidatawiki/entities/latest-all.json.bz2 и разместил его в корзине GS. Это большой (50G) файл .bz2, содержащий список json-слов (по одному на строку).

Я понимаю, что apache_beam.io.ReadFromText может обрабатывать .bz2 (я проверял это на игрушечных наборах данных) и что .bz2 можно разделить. Поэтому я надеялся, что будет создано несколько рабочих процессов, которые будут работать параллельно с разными блоками этого уникального файла (мне не совсем понятно, будут ли блоки res.

В конечном счете, я хочу выполнить некоторую аналитику для каждой строки (каждого словаря json), но в качестве теста для приема я просто использую проект wordcount.py:

python -m apache_beam.examples.wordcount \
--input gs://MYBUCKET/wikidata/latest-all.json.bz2 \
--output gs://MYBUCKET/wikidata/output/entities-all.json \
--runner DataflowRunner \
--project MYPROJECT \
--temp_location gs://MYBUCKET/tmp/ 

При запуске автомасштабирование быстро увеличивает количество воркеров 1->6, но только один воркер выполняет какую-либо работу, а затем автомасштабирование сокращается до 6->1 через пару минут (jobid: 2018-10-11_00_45_54-9419516948329946918)

Если я отключу автомасштабирование и задам явно количество воркеров, то все, кроме одного, останутся простаивать.

Можно ли добиться параллелизма на такого рода входных данных? Большое спасибо за любую помощь.


person Eric PICHON    schedule 11.10.2018    source источник
comment
Вы, вероятно, стали жертвой оптимизации слияния, попробуйте сделать GroupBy с пользовательским триггером (который будет выдавать группы, как только они будут доступны). Вы можете получить вдохновение здесь: github.com/apache/beam/blob/master/sdks/java/core/src/main/java/ По сути, вам нужно выполнить преобразование Reshuffle, потому что оптимизация fusion заставляет все шаги выполняться на одной машине вместо их распределения.   -  person Marcin Zablocki    schedule 11.10.2018
comment
@MarcinZablocki спасибо за быстрый ответ. Я прочитал о предотвращении слияния здесь cloud.google.com/dataflow. /сервис/. Хотя я еще не слишком уверен в том, как реализовать ваше решение с помощью триггеров, я внедрил луч преобразования в случайном порядке. Reshuffle() сразу после ReadFromText(). Я понимаю, что это должно предотвратить слияние. Это правильно? К сожалению, похоже, это ничего не меняет. Рабочие 1-›6-›1 в течение нескольких минут, например, 2018-10-11_07_20_38-9840705006566518535.   -  person Eric PICHON    schedule 11.10.2018


Ответы (1)


Помимо Hadoop, в Apache Beam еще не реализовано разделение bzip2: https://issues.apache.org/jira/browse/BEAM-683

person Sascha Brawer    schedule 25.10.2019