Чрезвычайно медленная обработка в Dataproc: 9 часов против 3 минут на локальной машине

Из журнала я вижу, что есть 182 тыс. строк 70 МБ. Требуется 1,5 часа для загрузки 70 МБ данных и 9 часов (начало 15.11.14 01:58:28 и закончилось 15.11.14 09:19:09) для обучения 182 тыс. строк в Dataproc. Загрузка тех же данных и запуск того же алгоритма на моем локальном компьютере занимает 3 минуты.

Журнал обработки данных

15/11/13 23:27:09 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Table 'mydata-data:website_wtw_feed.video_click20151111' to be exported has 182712 rows and 70281790 bytes
15/11/13 23:28:13 WARN akka.remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://[email protected]:60749] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Fetching the Ratings RDD
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Training ALS Matrix factorization Model


[Stage 2:=============================>                             (1 + 1) / 2]

15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS

15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the user feature matrix
  1. Скопировал данные на локальную машину

    r.viswanadha$ gsutil cp -r gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 .
    
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json... 
    
    Downloading ...201511132327_0000/shard-0/data-000000000000.json: 141.3 MiB/141.3 MiB      
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json... 
    
    Copying gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json...`
    
  2. Запустил тот же алгоритм. Шаг поезда ALS занял ~ 3 минуты

    com.dailymotion.recommender.BigQueryRecommender --app_name BigQueryRecommenderTest --master local[4] --input_dir /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/input/job_201511132327_0000/shard-0/ 
    

Первый забег

15/11/14 13:19:36 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model
...
15/11/14 13:22:24 INFO BigQueryRecommender: Transforming the video feature matrix

Второй запуск

15/11/14 13:29:05 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model


...

15/11/14 13:31:57 INFO BigQueryRecommender: Transforming the video feature matrix

Кластер DataProc состоит из 1 ведущего и 3 ведомых устройств с 104 ГБ (ОЗУ) и 16 ЦП каждый.

Моя локальная машина имеет 8 ГБ (ОЗУ) и 2 процессора Core i5 2,7 ГГц.

gsutil ls -l -r -h  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/: 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/ 

    141.3 MiB  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json 

   0 B  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/ 

    0 B  2015-11-13T23:28:47Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json 

   0 B  2015-11-13T23:27:09Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ 

TOTAL: 6 objects, 148165416 bytes (141.3 MiB)

person Ram    schedule 14.11.2015    source источник
comment
Привет @Ram. Каков был размер вашего кластера Dataproc (используемые узлы и типы машин?)   -  person James    schedule 16.11.2015
comment
Кроме того, не могли бы вы проверить, насколько медлителен экспорт BigQuery по сравнению с фактической обработкой? Особенно, если у вас все еще есть данные экспорта BigQuery, доступные в gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000, вы можете попробовать использовать их в качестве пути к файлу в задании Dataproc без использования BigQueryInputFormat.   -  person Dennis Huo    schedule 16.11.2015
comment
Кроме того, некоторая информация, которая может помочь, будет, если вы запустите gsutil ls -l -r gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ и сообщите размеры и временные метки. Если вы предпочитаете не делиться этой информацией здесь, отправьте ее по адресу [email protected], чтобы предоставить эту информацию только инженерам Google.   -  person Dennis Huo    schedule 16.11.2015
comment
1. Размер кластера: 1 ведущий, 3 подчиненных. Каждая машина имеет 104 ГБ (ОЗУ), 16 процессоров. 2. newHadoopAPI завершается примерно за 1,5 часа, ALS Train занимает около 7,5 часов.   -  person Ram    schedule 16.11.2015
comment
$ gsutil ls -l -r -h gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/ job_201511132327_0000/: gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 141,3 МБ 2015-11-13T23:29:21Z gs://dailymotion-spark-rc -test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json ... gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1 /: ... ВСЕГО: 6 объектов, 148165416 байт (141,3 МБ)   -  person Ram    schedule 16.11.2015
comment
Хм, я вижу только одну временную метку в этом выводе gsutil, может быть, часть ее была обрезана? Не могли бы вы отправить остальные временные метки 6 объектов на адрес [email protected]?   -  person Dennis Huo    schedule 16.11.2015
comment
Я разместил то же самое на groups.google.com/forum. /#!topic/cloud-dataproc-discuss/ Возможно, это поможет.   -  person Ram    schedule 16.11.2015
comment
Давайте продолжим обсуждение в чате.   -  person Ram    schedule 16.11.2015


Ответы (2)


Подводя итоги некоторых выводов, сделанных в автономном режиме, отметим, что когда в распределенном кластере все работает на несколько порядков медленнее, чем в локальной среде, основными узкими местами, которые следует искать, являются узкие места с задержкой ввода-вывода как с точки зрения межсетевого обслуживания зависимости, а также дисковый и локальный ввод-вывод.

На что следует обращать внимание в целом (некоторые из них могут быть или не быть применимы к вашему конкретному случаю, но могут быть общими для других, столкнувшихся с похожими проблемами):

  1. Убедитесь, что корзина GCS, содержащая данные, находится в том же регионе, что и зона GCE, в которой вы развернули свой кластер Dataproc с помощью gsutil ls -L gs://[your-bucket]. Межконтинентальный трафик не только значительно медленнее, но и может привести к дополнительным сетевым затратам в вашем проекте.
  2. Если у вашей работы есть какие-либо другие сетевые зависимости, такие как запросы к API или отдельная база данных, работающая на GCE, попробуйте разместить их в одной зоне; даже в пределах одного континента межрегиональный трафик GCE может иметь задержку приема-передачи в десятки миллисекунд, что может значительно увеличиться, особенно если выполняются запросы на запись (например, 30 мс * 180 тыс. записей получается 1,5 часа). ).
  3. Несмотря на то, что на этот раз это может быть неприменимо к вашему конкретному случаю, не забудьте по возможности избегать двустороннего ввода-вывода для каждой записи в GCS через интерфейсы файловой системы Hadoop; общая пропускная способность для GCS очень масштабируема, но из-за особенностей удаленного хранилища задержки при обмене данными намного меньше, чем задержки при обмене данными, которые вы могли бы измерить на локальном компьютере, из-за того, что локальные операции чтения часто затрагивают ОС. буферный кеш или если вы используете ноутбук с твердотельным накопителем, способным поддерживать большие объемы обращений туда и обратно менее миллисекунды по сравнению с 30-100 мс туда и обратно к GCS.

И в целом, для случаев использования, которые могут поддерживать очень высокую пропускную способность, но имеют большие задержки при обмене данными, убедитесь, что данные сегментированы чем-то вроде repartition(), если данные малы и не естественное разделение на достаточный параллелизм, чтобы обеспечить хорошее использование вашего кластера Spark.

Наконец, в нашем последнем выпуске Dataproc исправлен ряд нативных конфигураций библиотек, поэтому он может показать гораздо лучшую производительность для части ALS, а также других вариантов использования mllib.

person Dennis Huo    schedule 19.11.2015

Для тех, кто сталкивается с чем-то подобным: имея дело только с одним небольшим объектом в GCS (или с одним шардом с данными из коннектора BigQuery), вы можете получить один раздел в своем RDD Spark и, как следствие, получить мало или совсем нет параллелизма.

Хотя это приводит к дополнительной фазе перетасовки, входной RDD можно перераспределить сразу после чтения из GCS или BigQuery, чтобы получить желаемое количество разделов. Полезность дополнительного перемешивания зависит от того, сколько обработки или операций ввода-вывода требуется для каждой записи в RDD.

person Angus Davis    schedule 19.11.2015