Кластер Spark дает сбой при больших входных данных, хорошо работает при малых

Я играю со Спарком. Это готовый дистрибутив по умолчанию (0.7.0) с веб-сайта, с конфигурацией по умолчанию, режимом кластера, одним рабочим (мой локальный хост). Я прочитал документы по установке, и все кажется прекрасным.

У меня есть файл CSV (разные размеры, 1000-1 миллион строк). Если я запускаю свое приложение с небольшим входным файлом (например, 1000 строк), все в порядке, программа выполняется за секунды и выдает ожидаемый результат. Но когда я предоставляю файл большего размера (100 000 строк или 1 миллион), выполнение завершается ошибкой. Пробовал копаться в логах, но не сильно помогло (он повторяет весь процесс около 9-10 раз и после этого завершается с ошибкой. Также есть какая-то ошибка, связанная с ошибкой получения из какого-то нулевого источника).

Результат Iterable, возвращаемый первым JavaRDD, вызывает у меня подозрение. Если я верну жестко запрограммированный одноэлементный список (например, res.add("something"); return res;), все будет в порядке, даже с миллионом строк. Но если я добавлю все свои ключи, которые захочу (28 строк длиной 6-20 символов), процесс завершится ошибкой только с большим вводом. Проблема в том, что мне нужны все эти ключи, это фактическая бизнес-логика.

Я использую Linux amd64, четырехъядерный процессор, 8 ГБ оперативной памяти. Последний Oracle Java7 JDK. Конфигурация спарка:

SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar

Я должен упомянуть, что когда я запускаю программу, она говорит:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address

Вот моя программа. Он основан на примере JavaWordCount с минимальными изменениями.

public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(final String s)
            {
                // parsing "s" as the line, computation, building res (it's a List<String>)
                return res;
            }
        });

        final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(final String s)
            {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(final Integer i1, final Integer i2)
            {
                return i1 + i2;
            }
        });

        counts.collect();

        for (Tuple2<?, ?> tuple : counts.collect()) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
    }
}

person gyorgyabraham    schedule 30.05.2013    source источник
comment
Перед изменением свойств системы Spark, с каким исключением/ошибкой ваша работа не удалась?   -  person Josh Rosen    schedule 05.06.2013
comment
В группе пользователей spark я получил ответ, что .collect() вызовет сбор всех без исключения (временных) RDD. Это была настоящая проблема. Тема с решением здесь: stackoverflow.com/questions/16832429/   -  person gyorgyabraham    schedule 06.06.2013
comment
Я целую вечность гуглил, пытаясь найти решение моей проблемы, ответ на этот вопрос решает мою проблему, поэтому, пожалуйста, отредактируйте свой вопрос, включив в него org.apache.spark.SparkException: Ошибка связи с MapOutputTracker в вашем вопросе, чтобы сделать поиск в Google легче для других в будущем.   -  person samthebest    schedule 14.05.2014


Ответы (2)


Мне удалось это исправить, установив для свойства spark.mesos.coarse значение true. Подробнее здесь.

Обновление: я играл со Spark в течение нескольких часов. Эти настройки немного помогли мне, но кажется, что почти невозможно обработать ~ 10 миллионов строк текста на одной машине.

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load

Примечание. Увеличение размера кадра кажется особенно полезным для предотвращения: org.apache.spark.SparkException: Error communicating with MapOutputTracker

person gyorgyabraham    schedule 30.05.2013
comment
spark.akka.frameSize также решил мою проблему org.apache.spark.SparkException: Error communicating with MapOutputTracker. - person samthebest; 14.05.2014
comment
Работает ли System.setProperty() в искровой оболочке? Я не могу получить набор frameSize - person Brian Dolan; 30.11.2014

в более новой версии искры следует использовать:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

согласно http://spark.apache.org/docs/latest/tuning.html#data-serialization

person David Wu    schedule 15.08.2014