Сбои заданий Spark on Google Cloud Dataproc на последних этапах

Я работаю с кластером Spark на Dataproc, и моя работа терпит неудачу в конце обработки.

Мой источник данных - это текстовые файлы журналов в формате csv в Google Cloud Storage (общий объем - 3,5 ТБ, 5000 файлов).

Логика обработки следующая:

  • читать файлы в DataFrame (схема ["отметка времени", "сообщение"]);
  • сгруппировать все сообщения в окно длительностью 1 секунду;
  • применить конвейер [Tokenizer -> HashingTF] к каждому сгруппированному сообщению, чтобы извлечь слова и их частоту для построения векторов признаков;
  • сохранять векторы функций с временными шкалами на GCS.

Проблемы, с которыми я сталкиваюсь, заключаются в том, что обработка небольшого подмножества данных (например, 10 файлов) работает хорошо, но когда я запускаю ее для всех файлов, в самом конце происходит сбой с ошибкой типа «Контейнер убит YARN из-за превышения памяти. Ограничения. Используется 25,0 ГБ из 24 ГБ физической памяти. Рассмотрите возможность увеличения spark.yarn.executor.memoryOverhead. "

В моем кластере 25 рабочих с машинами n1-highmem-8. Я поискал эту ошибку в Google и буквально увеличил параметр "spark.yarn.executor.memoryOverhead" до 6500 МБ.

Теперь моя искровая работа по-прежнему не работает, но с ошибкой «Работа прервана из-за сбоя этапа: общий размер сериализованных результатов 4293 задач (1920,0 МБ) больше, чем spark.driver.maxResultSize (1920,0 МБ)»

Я новичок в Spark и считаю, что делаю что-то не так или на уровне конфигурации, или в моем коде. Если вы поможете мне почистить эту штуку, будет здорово!

Вот мой код для задачи искры:

import logging
import string
from datetime import datetime

import pyspark
import re
from pyspark.sql import SparkSession

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'

sc = pyspark.SparkContext()
spark = SparkSession\
        .builder\
        .appName("LogsVectorizer")\
        .getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)

logger.info("Start log processing at {}...".format(NOW))

# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'  
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)  
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)


# CSV data schema to build DataFrame
schema = StructType([
    StructField("timestamp", StringType()),
    StructField("message", StringType())])

# Helpers to clean strings in log fields
def cleaning_string(s):
    try:
        # Remove ids (like: app[2352] -> app)
        s = re.sub('\[.*\]', 'IDTAG', s)
        if s == '':
            s = 'EMPTY'
    except Exception as e:
        print("Skip string with exception {}".format(e))
    return s

def normalize_string(s):
    try:
        # Remove punctuation
        s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
        # Remove digits
        s = re.sub('\d*', '', s)
        # Remove extra spaces
        s = ' '.join(s.split())
    except Exception as e:
        print("Skip string with exception {}".format(e)) 
    return s

def line_splitter(line):
    line = line.split(',')
    timestamp = line[0]
    full_message = ' '.join(line[1:])
    full_message = normalize_string(cleaning_string(full_message))
    return [timestamp, full_message]

# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)

# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))

# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())

# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
                       .agg(join_(F.collect_list("message")).alias("messages"))

# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)

pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])

logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)

logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
                   .withColumn("tokens_counts", asDense(logs_csv.tokens_counts))

# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)

# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)

person Alexander Usoltsev    schedule 12.07.2017    source источник


Ответы (1)


spark.driver.maxResultSize - это проблема с размером вашего драйвера, который в Dataproc запускается на главном узле.

По умолчанию 1/4 памяти ведущего устройства отводится драйверу, а 1/2 объема памяти задается равным spark.driver.maxResultSize (самый большой RDD Spark позволит вам .collect().

Я предполагаю, что Tokenizer или HashingTF перемещают «метаданные» через драйвер, размер которого равен размеру вашего пространства ключей. Чтобы увеличить допустимый размер, вы можете увеличить spark.driver.maxResultSize, но вы также можете увеличить spark.driver.memory и / или использовать мастер большего размера. Дополнительную информацию можно найти в руководстве по настройке Spark.

person Patrick Clay    schedule 13.07.2017
comment
Большое спасибо! Установка spark.driver.maxResultSize на 10g с типом машины мастера как n1-highmem-4 помогла. - person Alexander Usoltsev; 24.07.2017