Вопрос. Является ли это допустимым способом проверки времени, необходимого для создания RDD?
Я делаю две вещи здесь. Базовый подход заключается в том, что у нас есть M экземпляров того, что мы называем DropEvaluation, и N DropResults. Нам нужно сравнить каждый N DropResult с каждым из M DropEvaluations. Каждое N должно быть просмотрено каждым M, чтобы в итоге получить M результатов.
Если я не использую .count() после сборки RDD, драйвер переходит к следующей строке кода и говорит, что почти нет времени на сборку RDD, которая занимает 30 минут.
Я просто хочу убедиться, что ничего не упустил, например, .count() занимает много времени? Я предполагаю, что для определения времени .count() мне придется изменить исходный код Spark?
M = 1000 or 2000. N = 10^7.
По сути, это декартова проблема — аккумулятор был выбран потому, что нам нужно записать каждое М на месте. Мне также было бы некрасиво построить полный декартовский СДР.
Мы создаем список из M аккумуляторов (не может сделать аккумулятор списка в Java, верно?). Затем мы перебираем каждый из N в RDD с помощью foreach.
Уточнение вопроса: общее затраченное время измеряется правильно. Я спрашиваю, заставляет ли .count() в RDD Spark ждать завершения RDD, прежде чем он сможет запустить подсчет. Имеет ли значение время .count()?
Вот наш код:
// assume standin exists and does it's thing correctly
// this controls the final size of RDD, as we are not parallelizing something with an existing length
List<Integer> rangeN = IntStream.rangeClosed(simsLeft - blockSize + 1, simsLeft).boxed().collect(Collectors.toList());
// setup bogus array of size N for parallelize dataSetN to lead to dropResultsN
JavaRDD<Integer> dataSetN = context.parallelize(rangeN);
// setup timing to create N
long NCreationStartTime = System.nanoTime();
// this maps each integer element of RDD dataSetN to a "geneDropped" chromosome simulation, we need N of these:
JavaRDD<TholdDropResult> dropResultsN = dataSetN.map(s -> standin.call(s)).persist(StorageLevel.MEMORY_ONLY());
// **** this line makes the driver wait until the RDD is done, right?
long dummyLength = dropResultsN.count();
long NCreationNanoSeconds = System.nanoTime() - NCreationStartTime;
double NCreationSeconds = (double)NCreationNanoSeconds / 1000000000.0;
double NCreationMinutes = NCreationSeconds / 60.0;
logger.error("{} test sims remaining", simsLeft);
// now get the time for just the dropComparison (part of accumulable's add)
long startDropCompareTime = System.nanoTime();
// here we iterate through each accumulator in the list and compare all N elements of dropResultsN RDD to each M in turn, our .add() is a custom AccumulableParam
for (Accumulable<TholdDropTuple, TholdDropResult> dropEvalAccum : accumList) {
dropResultsN.foreach(new VoidFunction<TholdDropResult>() {
@Override
public void call(TholdDropResult dropResultFromN) throws Exception {
dropEvalAccum.add(dropResultFromN);
}
});
}
// all the dropComparisons for all N to all M for this blocksize are done, check the time...
long dropCompareNanoSeconds = System.nanoTime() - startDropCompareTime;
double dropCompareSeconds = (double)dropCompareNanoSeconds / 1000000000.0;
double dropCompareMinutes = dropCompareSeconds / 60.0;
// write lines to indicate timing section
// log and write to file the time for the N-creation
...
} // end for that goes through dropAccumList