Spark имеет настраиваемую подсистему метрик. По умолчанию он публикует зарегистрированные показатели в формате JSON на <driver>:<port>/metrics/json
. Можно настроить синхронизацию других показателей, таких как ганглии, файлы csv или JMX.
Вам понадобится внешняя система мониторинга, которая регулярно собирает метрики и помогает вам разобраться в них. (примечание: мы используем Ganglia, но есть и другие варианты с открытым исходным кодом и коммерческие варианты)
Spark Streaming публикует несколько показателей, которые можно использовать для отслеживания производительности вашей работы. Чтобы рассчитать пропускную способность, вы должны объединить:
(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records
Все поддерживаемые метрики см. на Источник потоковой передачи
Пример: запуск локального REPL со Spark 1.3.1 и после выполнения тривиального потокового приложения:
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start
можно ПОЛУЧИТЬ localhost:4040/metrics/json
и вернуть:
{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
person
maasg
schedule
01.05.2015