Я играю со Спарком. Это готовый дистрибутив по умолчанию (0.7.0) с веб-сайта, с конфигурацией по умолчанию, режимом кластера, одним рабочим (мой локальный хост). Я прочитал документы по установке, и все кажется прекрасным.
У меня есть файл CSV (разные размеры, 1000-1 миллион строк). Если я запускаю свое приложение с небольшим входным файлом (например, 1000 строк), все в порядке, программа выполняется за секунды и выдает ожидаемый результат. Но когда я предоставляю файл большего размера (100 000 строк или 1 миллион), выполнение завершается ошибкой. Пробовал копаться в логах, но не сильно помогло (он повторяет весь процесс около 9-10 раз и после этого завершается с ошибкой. Также есть какая-то ошибка, связанная с ошибкой получения из какого-то нулевого источника).
Результат Iterable, возвращаемый первым JavaRDD, вызывает у меня подозрение. Если я верну жестко запрограммированный одноэлементный список (например, res.add("something"); return res;), все будет в порядке, даже с миллионом строк. Но если я добавлю все свои ключи, которые захочу (28 строк длиной 6-20 символов), процесс завершится ошибкой только с большим вводом. Проблема в том, что мне нужны все эти ключи, это фактическая бизнес-логика.
Я использую Linux amd64, четырехъядерный процессор, 8 ГБ оперативной памяти. Последний Oracle Java7 JDK. Конфигурация спарка:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
Я должен упомянуть, что когда я запускаю программу, она говорит:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Вот моя программа. Он основан на примере JavaWordCount с минимальными изменениями.
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String s)
{
// parsing "s" as the line, computation, building res (it's a List<String>)
return res;
}
});
final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer i1, final Integer i2)
{
return i1 + i2;
}
});
counts.collect();
for (Tuple2<?, ?> tuple : counts.collect()) {
System.out.println(tuple._1 + ": " + tuple._2);
}
}
}