мониторинг пропускной способности потоковой передачи искры

Есть ли способ контролировать входную и выходную пропускную способность кластера Spark, чтобы убедиться, что кластер не переполнен входящими данными?

В моем случае я настроил кластер Spark на AWS EC2, поэтому думаю использовать AWS CloudWatch для мониторинга NetworkIn и NetworkOut для каждый узел в кластере.

Но моя идея кажется не точной, и сеть не имеет в виду входящие данные только для Spark, возможно, будут рассчитаны и некоторые другие данные.

Есть ли инструмент или способ отслеживать конкретно статус потоковой передачи данных кластера Spark? Или в Spark уже есть встроенный инструмент, который я пропустил?


обновление: выпущен Spark 1.4, мониторинг на порту 4040 значительно улучшен благодаря графическому отображению


person keypoint    schedule 01.05.2015    source источник
comment
Привет @keypoint! Вы узнали, как настроить conf/metrics.properties, чтобы метрики Spark отображались в CloudWatch?   -  person nicola    schedule 07.03.2017
comment
@nicola извините, я не...   -  person keypoint    schedule 07.03.2017


Ответы (2)


Spark имеет настраиваемую подсистему метрик. По умолчанию он публикует зарегистрированные показатели в формате JSON на <driver>:<port>/metrics/json. Можно настроить синхронизацию других показателей, таких как ганглии, файлы csv или JMX.

Вам понадобится внешняя система мониторинга, которая регулярно собирает метрики и помогает вам разобраться в них. (примечание: мы используем Ganglia, но есть и другие варианты с открытым исходным кодом и коммерческие варианты)

Spark Streaming публикует несколько показателей, которые можно использовать для отслеживания производительности вашей работы. Чтобы рассчитать пропускную способность, вы должны объединить:

(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records

Все поддерживаемые метрики см. на Источник потоковой передачи

Пример: запуск локального REPL со Spark 1.3.1 и после выполнения тривиального потокового приложения:

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start

можно ПОЛУЧИТЬ localhost:4040/metrics/json и вернуть:

{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
person maasg    schedule 01.05.2015
comment
Привет, маасг, как ты установил файл conf/metrics.properties? Я разместил отдельный ответ, описывающий то, что я нашел. большое спасибо - person keypoint; 02.05.2015
comment
@keypoint Вы можете настроить расположение файла свойств, указав spark.metrics.conf = <file>.properties, если он не найден в файловой системе по указанному пути, Spark попытается загрузить его из пути к классам. - person maasg; 02.05.2015
comment
@keypoint Я обновил ответ очень небольшим примером, который вы можете попробовать сами. Надеюсь, это поможет. - person maasg; 02.05.2015
comment
это так круто ! Теперь я получаю то же сообщение, что и здесь, большое спасибо - person keypoint; 03.05.2015
comment
Я использую ганглии для мониторинга искры, но XML-репортер метрик Ganglia не работает правильно, что нарушает работу gmond. это моя конфигурация *.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink ``*.sink.ganglia.host=127.0.0.1 *.sink.ganglia.port=8649 ``*.sink.ganglia.period=10 *.sink.ganglia.unit=seconds *.sink.ganglia.ttl=1 *.sink.ganglia.mode=unicast - person Junaid; 29.07.2015

Я рекомендую использовать https://spark.apache.org/docs/latest/monitoring.html#metrics с Prometheus (https://prometheus.io/).

Метрики, сгенерированные метриками Spark, могут быть получены с помощью Prometheus, а также предлагает пользовательский интерфейс. Prometheus — бесплатный инструмент.

person Shiva Garg    schedule 18.06.2020