Время Apache Spark для каждой операции на JavaRDD

Вопрос. Является ли это допустимым способом проверки времени, необходимого для создания RDD?

Я делаю две вещи здесь. Базовый подход заключается в том, что у нас есть M экземпляров того, что мы называем DropEvaluation, и N DropResults. Нам нужно сравнить каждый N DropResult с каждым из M DropEvaluations. Каждое N должно быть просмотрено каждым M, чтобы в итоге получить M результатов.

Если я не использую .count() после сборки RDD, драйвер переходит к следующей строке кода и говорит, что почти нет времени на сборку RDD, которая занимает 30 минут.

Я просто хочу убедиться, что ничего не упустил, например, .count() занимает много времени? Я предполагаю, что для определения времени .count() мне придется изменить исходный код Spark?

M = 1000 or 2000. N = 10^7.

По сути, это декартова проблема — аккумулятор был выбран потому, что нам нужно записать каждое М на месте. Мне также было бы некрасиво построить полный декартовский СДР.

Мы создаем список из M аккумуляторов (не может сделать аккумулятор списка в Java, верно?). Затем мы перебираем каждый из N в RDD с помощью foreach.

Уточнение вопроса: общее затраченное время измеряется правильно. Я спрашиваю, заставляет ли .count() в RDD Spark ждать завершения RDD, прежде чем он сможет запустить подсчет. Имеет ли значение время .count()?

Вот наш код:

// assume standin exists and does it's thing correctly

// this controls the final size of RDD, as we are not parallelizing something with an existing length
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList());

// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN       
JavaRDD<Integer> dataSetN = context.parallelize(rangeN);

// setup timing to create N
long NCreationStartTime = System.nanoTime();

// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these:
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY());

// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();


long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime;
double NCreationSeconds = (double)NCreationNanoSeconds / 1000000000.0;
double NCreationMinutes = NCreationSeconds / 60.0;

logger.error("{} test sims remaining", simsLeft);

// now get the time for just the dropComparison (part of accumulable's add)
long startDropCompareTime = System.nanoTime();

// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) {
    dropResultsN.foreach(new VoidFunction<TholdDropResult>() {
                    @Override
                    public void call(TholdDropResult dropResultFromN) throws Exception {
                            dropEvalAccum.add(dropResultFromN);
                    }
                });
            }

    // all the dropComparisons for all N to all M for this blocksize are done, check the time...
   long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime;
   double dropCompareSeconds = (double)dropCompareNanoSeconds / 1000000000.0;
    double dropCompareMinutes = dropCompareSeconds / 60.0;

    // write lines to indicate timing section
    // log and write to file the time for the N-creation

    ...

} // end for that goes through dropAccumList

person JimLohse    schedule 10.07.2016    source источник
comment
Я хотел бы отметить в ответ на ответ Dikei ниже, который я поставил +1, он не отвечает на основной вопрос, является ли это допустимым способом точного определения времени создания RDD? Занимает ли сам подсчет значительное время, что увеличивает время создания RDD? Любые ссылки на хорошие примеры синхронизации вещей Spark?   -  person JimLohse    schedule 12.07.2016


Ответы (1)


Программа Spark ленива, она не запустится, пока вы не вызовете все действия, такие как count в RDD. Вы можете найти список общих действий в документе Spark.

// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();

Да, в этом случае count заставит вычислить dropResultsN, так что это займет много времени. Если вы сделаете второй count, он вернется очень быстро, поскольку RDD уже вычислен и кэширован.

person Kien Truong    schedule 11.07.2016
comment
Если бы RDD никогда не вычислялся, я бы согласился с вами. Поскольку я вызываю dropResultsN.foreach, это приводит к его вычислению. Foreach находится в списке, который вы связали. Так полуошибка, я думаю? Также я думал, что кэшировал это так: JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY()); ... или я сохраняю только dataSetN? вау, вы, наверное, просто взорвали мой мозг :) - person JimLohse; 12.07.2016
comment
Моя ошибка, я не увидел звонка persist. Вызов persist с использованием MEMORY_ONLY аналогичен вызову cache. - person Kien Truong; 12.07.2016
comment
См. комментарий выше к моему вопросу, вы пришли к мысли, что первый .count() занимает много времени, если у вас есть другие наблюдения о допустимых подходах к синхронизации Spark или некоторые ссылки/примеры, я мог бы принять этот ответ, иначе собираюсь подождать несколько дней и посмотреть, кто откликнется. Спасибо. - person JimLohse; 12.07.2016
comment
Нет, из этого другого вопроса (в моем собственном ответе) я понял, что настаивал на правильном. Все равно лучше следовать примеру, приведенному в моем ответе из книги О'Рейли Learning Spark: rddToPersist = existingRDD.map.etc.etc; rddToPersist.persist(); - person JimLohse; 15.07.2016