问题描述
是否有一种方法可以监视Spark集群的输入和输出吞吐量,以确保集群不会被传入的数据淹没和溢出?
Is there a way to monitor the input and output throughput of a Spark cluster, to make sure the cluster is not flooded and overflowed by incoming data?
就我而言,我在AWS EC2上设置了Spark集群,因此我考虑使用 AWS CloudWatch 监视 NetworkIn 和 NetworkOut ".
In my case, I set up Spark cluster on AWS EC2, so I'm thinking of using AWS CloudWatch to monitor the NetworkIn and NetworkOut for each node in the cluster.
但是我的想法似乎并不准确,并且网络不仅仅意味着Spark的传入数据,也许还会计算一些其他数据.
But my idea seems to be not accurate and network does not meaning incoming data for Spark only, maybe also some other data would be calculated too.
是否有专门用于监视火花群集流数据状态的工具或方法?还是我错过了Spark中的内置工具?
Is there a tool or way to monitor specifically for Spark cluster streaming data status? Or there's already a built-in tool in Spark that I missed?
更新: Spark 1.4 发布,通过图形显示大大增强了对端口4040的监视
update: Spark 1.4 released, monitoring at port 4040 is significantly enhanced with graphical display
推荐答案
Spark具有可配置的指标子系统.默认情况下,它在<driver>:<port>/metrics/json
上发布已注册指标的JSON版本.可以配置其他指标同步,例如神经节,csv文件或JMX.
Spark has a configurable metric subsystem.By default it publishes a JSON version of the registered metrics on <driver>:<port>/metrics/json
. Other metrics syncs, like ganglia, csv files or JMX can be configured.
您将需要一些外部监视系统,该系统可以定期收集指标并帮助您理解它. (注:我们使用Ganglia,但还有其他开源和商业选择)
You will need some external monitoring system that collects metrics on a regular basis an helps you make sense of it. (n.b. We use Ganglia but there's other open source and commercial options)
Spark Streaming发布了一些可用于监视您的工作绩效的指标.要计算吞吐量,您可以结合:
Spark Streaming publishes several metrics that can be used to monitor the performance of your job. To calculate throughput, you would combine:
(lastReceivedBatch_processingEndTime-lastReceivedBatch_processingStartTime)/lastReceivedBatch_records
对于所有受支持的指标,请查看 StreamingSource
For all metrics supported, have a look at StreamingSource
示例:使用Spark 1.3.1启动本地REPL,并在执行琐碎的流应用程序之后:
Example: Starting a local REPL with Spark 1.3.1 and after executing a trivial streaming application:
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
val queue = scala.collection.mutable.Queue(1,2,3,45,6,6,7,18,9,10,11)
val q = queue.map(elem => sc.parallelize(Seq(elem)))
val dstream = ssc.queueStream(q)
dstream.print
ssc.start
一个人可以获取localhost:4040/metrics/json
并且返回:
one can GET localhost:4040/metrics/json
and that returns:
{
version: "3.0.0",
gauges: {
local-1430558777965.<driver>.BlockManager.disk.diskSpaceUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.maxMem_MB: {
value: 2120
},
local-1430558777965.<driver>.BlockManager.memory.memUsed_MB: {
value: 0
},
local-1430558777965.<driver>.BlockManager.memory.remainingMem_MB: {
value: 2120
},
local-1430558777965.<driver>.DAGScheduler.job.activeJobs: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.job.allJobs: {
value: 6
},
local-1430558777965.<driver>.DAGScheduler.stage.failedStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.runningStages: {
value: 0
},
local-1430558777965.<driver>.DAGScheduler.stage.waitingStages: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastCompletedBatch_totalDelay: {
value: 44
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime: {
value: 1430559950044
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_processingStartTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_records: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.lastReceivedBatch_submissionTime: {
value: 1430559950000
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.receivers: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.retainedCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.runningBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalCompletedBatches: {
value: 2
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalProcessedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.totalReceivedRecords: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.unprocessedBatches: {
value: 0
},
local-1430558777965.<driver>.Spark shell.StreamingMetrics.streaming.waitingBatches: {
value: 0
}
},
counters: { },
histograms: { },
meters: { },
timers: { }
}
这篇关于火花流吞吐量监控的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!