Я работаю с кластером Spark на Dataproc, и моя работа терпит неудачу в конце обработки.
Мой источник данных - это текстовые файлы журналов в формате csv в Google Cloud Storage (общий объем - 3,5 ТБ, 5000 файлов).
Логика обработки следующая:
- читать файлы в DataFrame (схема ["отметка времени", "сообщение"]);
- сгруппировать все сообщения в окно длительностью 1 секунду;
- применить конвейер [Tokenizer -> HashingTF] к каждому сгруппированному сообщению, чтобы извлечь слова и их частоту для построения векторов признаков;
- сохранять векторы функций с временными шкалами на GCS.
Проблемы, с которыми я сталкиваюсь, заключаются в том, что обработка небольшого подмножества данных (например, 10 файлов) работает хорошо, но когда я запускаю ее для всех файлов, в самом конце происходит сбой с ошибкой типа «Контейнер убит YARN из-за превышения памяти. Ограничения. Используется 25,0 ГБ из 24 ГБ физической памяти. Рассмотрите возможность увеличения spark.yarn.executor.memoryOverhead. "
В моем кластере 25 рабочих с машинами n1-highmem-8. Я поискал эту ошибку в Google и буквально увеличил параметр "spark.yarn.executor.memoryOverhead" до 6500 МБ.
Теперь моя искровая работа по-прежнему не работает, но с ошибкой «Работа прервана из-за сбоя этапа: общий размер сериализованных результатов 4293 задач (1920,0 МБ) больше, чем spark.driver.maxResultSize (1920,0 МБ)»
Я новичок в Spark и считаю, что делаю что-то не так или на уровне конфигурации, или в моем коде. Если вы поможете мне почистить эту штуку, будет здорово!
Вот мой код для задачи искры:
import logging
import string
from datetime import datetime
import pyspark
import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType, ArrayType
from pyspark.sql import functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
NOW = datetime.now().strftime("%Y%m%d%H%M%S")
START_DATE = '2016-01-01'
END_DATE = '2016-03-01'
sc = pyspark.SparkContext()
spark = SparkSession\
.builder\
.appName("LogsVectorizer")\
.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', 10000)
logger.info("Start log processing at {}...".format(NOW))
# Filenames to read/write locations
logs_fn = 'gs://databucket/csv/*'
vectors_fn = 'gs://databucket/vectors_out_{}'.format(NOW)
pipeline_fn = 'gs://databucket/pipeline_vectors_out_{}'.format(NOW)
model_fn = 'gs://databucket/model_vectors_out_{}'.format(NOW)
# CSV data schema to build DataFrame
schema = StructType([
StructField("timestamp", StringType()),
StructField("message", StringType())])
# Helpers to clean strings in log fields
def cleaning_string(s):
try:
# Remove ids (like: app[2352] -> app)
s = re.sub('\[.*\]', 'IDTAG', s)
if s == '':
s = 'EMPTY'
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def normalize_string(s):
try:
# Remove punctuation
s = re.sub('[{}]'.format(re.escape(string.punctuation)), ' ', s)
# Remove digits
s = re.sub('\d*', '', s)
# Remove extra spaces
s = ' '.join(s.split())
except Exception as e:
print("Skip string with exception {}".format(e))
return s
def line_splitter(line):
line = line.split(',')
timestamp = line[0]
full_message = ' '.join(line[1:])
full_message = normalize_string(cleaning_string(full_message))
return [timestamp, full_message]
# Read line from csv, split to date|message
# Read CSV to DataFrame and clean its fields
logger.info("Read CSV to DF...")
logs_csv = sc.textFile(logs_fn)
logs_csv = logs_csv.map(lambda line: line_splitter(line)).toDF(schema)
# Keep only lines for our date interval
logger.info("Filter by dates...")
logs_csv = logs_csv.filter((logs_csv.timestamp>START_DATE) & (logs_csv.timestamp<END_DATE))
logs_csv = logs_csv.withColumn("timestamp", logs_csv.timestamp.cast("timestamp"))
# Helpers to join messages into window and convert sparse to dense
join_ = F.udf(lambda x: "| ".join(x), StringType())
asDense = F.udf(lambda v: v.toArray().tolist())
# Agg by time window
logger.info("Group log messages by time window...")
logs_csv = logs_csv.groupBy(F.window("timestamp", "1 second"))\
.agg(join_(F.collect_list("message")).alias("messages"))
# Turn message to hashTF
tokenizer = Tokenizer(inputCol="messages", outputCol="message_tokens")
hashingTF = HashingTF(inputCol="message_tokens", outputCol="tokens_counts", numFeatures=1000)
pipeline_tf = Pipeline(stages=[tokenizer, hashingTF])
logger.info("Fit-Transform ML Pipeline...")
model_tf = pipeline_tf.fit(logs_csv)
logs_csv = model_tf.transform(logs_csv)
logger.info("Spase vectors to Dense list...")
logs_csv = logs_csv.sort("window.start").select(["window.start", "tokens_counts"])\
.withColumn("tokens_counts", asDense(logs_csv.tokens_counts))
# Save to disk
# Save Pipeline and Model
logger.info("Save models...")
pipeline_tf.save(pipeline_fn)
model_tf.save(model_fn)
# Save to GCS
logger.info("Save results to GCS...")
logs_csv.write.parquet(vectors_fn)