Spark2.1.0——内置Web框架详解

  任何系统都需要提供监控功能,否则在运行期间发生一些异常时,我们将会束手无策。也许有人说,可以增加日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现Bug,以及提供对业务有帮助的调试信息。当你的JVM进程奔溃或者程序响应速度很慢时,这些日志将毫无用处。好在JVM提供了jstat、jstack、jinfo、jmap、jhat等工具帮助我们分析,更有VisualVM的可视化界面以更加直观的方式对JVM运行期的状况进行监控。此外,像Tomcat、Hadoop等服务都提供了基于Web的监控页面,用浏览器能访问具有样式及布局,并提供丰富监控数据的页面无疑是一种简单、高效的方式。

  Spark自然也提供了Web页面来浏览监控数据,而且Master、Worker、Driver根据自身功能提供了不同内容的Web监控页面。无论是Master、Worker,还是Driver,它们都使用了统一的Web框架WebUI。Master、Worker及Driver分别使用MasterWebUI、WorkerWebUI及SparkUI提供的Web界面服务,后三者都继承自WebUI,并增加了个性化的功能。此外,在Yarn或Mesos模式下还有WebUI的另一个扩展实现HistoryServer。HistoryServer将会展现已经运行完成的应用程序信息。本章以SparkUI为例,并深入分析WebUI的框架体系。

SparkUI概述

  在大型分布式系统中,采用事件监听机制是最常见的。为什么要使用事件监听机制?假如Spark UI采用Scala的函数调用方式,那么随着整个集群规模的增加,对函数的调用会越来越多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据无法及时显示给用户的情况。由于函数调用多数情况下是同步调用,这就导致线程被阻塞,在分布式环境中,还可能因为网络问题,导致线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程可以继续执行后续逻辑进而被快速释放。线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。Spark UI就是这样的服务,它的构成如图1所示。

Spark2.1.0——内置Web框架详解-LMLPHP

图1       SparkUI的组成

图1展示了SparkUI中的各个组件,这里对这些组件作简单介绍:

  • SparkListenerEvent事件的来源:包括DAGScheduler、SparkContext、DriverEndpoint、BlockManagerMasterEndpoint以及LocalSchedulerBackend等,这些组件将会产生各种SparkListenerEvent,并发送到listenerBus的事件队列中。DriverEndpoint是Driver在Standalone或local-cluster模式下与其他组件进行通信的组件,在《Spark内核设计的艺术》一书的第9.9.2节有详细介绍。BlockManagerMasterEndpoint是Driver对分配给应用的所有Executor及其BlockManager进行统一管理的组件,在《Spark内核设计的艺术》一书的6.8节详细介绍。LocalSchedulerBackend是local模式下的调度后端接口,用于给任务分配资源或对任务的状态进行更新,在《Spark内核设计的艺术》一书的7.8.2节详细介绍。
  • 事件总线listenerBus。根据3.3节对事件总线的介绍,我们知道listenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,进而改变各个SparkListener中的统计监控数据。
  • Spark UI的界面。各个SparkListener内的统计监控数据将会被各种标签页和具体页面展示到Web界面。标签页有StagesTab、JobsTab、ExecutorsTab、EnvironmentTab以及StorageTab。每个标签页中包含若干个页面,例如StagesTab标签页中包含了AllStagesPage、StagePage及PoolPage三个页面。
  • 控制台的展示。细心的读者会发现图1中还有SparkStatusTracker(Spark状态跟踪器)和ConsoleProgressBar(控制台进度条)两个组件。SparkStatusTracker负责对Job和Stage的监控,其实际也是使用了JobProgressListener中的监控数据,并额外进行了一些加工。ConsoleProgressBar负责将SparkStatusTracker提供的数据打印到控制台上。从最终展现的角度来看,SparkStatusTracker和ConsoleProgressBar不应该属于SparkUI的组成部分,但是由于其实现与JobProgressListener密切相关,所以将它们也放在了SparkUI的内容中。

WebUI框架体系

  Spark UI构建在WebUI的框架体系之上,因此应当首先了解WebUI。WebUI定义了一种Web界面展现的框架,并提供返回Json格式数据的Web服务。WebUI用于展示一组标签页,WebUITab定义了标签页的规范。每个标签页中包含着一组页面,WebUIPage定义了页面的规范。我们将首先了解WebUIPage和WebUITab,最后从整体来看WebUI。

WebUIPage的定义

  任何的Web界面往往由多个页面组成,每个页面都将提供不同的内容展示。WebUIPage是WebUI框架体系的页节点,定义了所有页面应当遵循的规范。抽象类WebUIPage的定义见代码清单1。

代码清单1  WebUIPage的定义

private[spark] abstract class WebUIPage(var prefix: String) {
def render(request: HttpServletRequest): Seq[Node]
def renderJson(request: HttpServletRequest): JValue = JNothing
} 

WebUIPage定义了两个方法。

  • render:渲染页面;
  • renderJson:生成JSON。

WebUIPage在WebUI框架体系中的上一级节点(也可以称为父亲)可以是WebUI或者WebUITab,其成员属性prefix将与上级节点的路径一起构成当前WebUIPage的访问路径。

WebUITab的定义

有时候Web界面需要将多个页面作为一组内容放置在一起,这时候标签页是常见的展现形式。标签页WebUITab定义了所有标签页的规范,并用于展现一组WebUIPage。抽象类WebUITab的定义见代码清单2。

代码清单2  WebUITab的定义

private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
val pages = ArrayBuffer[WebUIPage]()
val name = prefix.capitalize def attachPage(page: WebUIPage) {
page.prefix = (prefix + "/" + page.prefix).stripSuffix("/")
pages += page
} def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath
}

根据代码清单2,可以看到WebUITab有四个成员属性:

  • parent:上一级节点,即父亲。WebUITab的父亲只能是WebUI。
  • prefix:当前WebUITab的前缀。prefix将与上级节点的路径一起构成当前WebUITab的访问路径。
  • pages:当前WebUITab所包含的WebUIPage的缓冲数组。
  • name:当前WebUITab的名称。name实际是对prefix的首字母转换成大写字母后取得。

此外,WebUITab还有三个成员方法,下面介绍它们的作用:

  • attachPage:首先将当前WebUITab的前缀与WebUIPage的前缀拼接,作为WebUIPage的访问路径。然后向pages中添加WebUIPage。
  • headerTabs:获取父亲WebUI中的所有WebUITab。此方法实际通过调用父亲WebUI的getTabs方法实现,getTabs方法请参阅下一小节——WebUI的定义。
  • basePath:获取父亲WebUI的基本路径。此方法实际通过调用父亲WebUI的getBasePath方法实现,getBasePath方法请参阅下一小节——WebUI的定义。。

WebUI的定义

  WebUI是Spark实现的用于提供Web界面展现的框架,凡是需要页面展现的地方都可以继承它来完成。WebUI定义了WebUI框架体系的规范。为便于理解,首先明确WebUI中各个成员属性的含义:

  • securityManager:SparkEnv中创建的安全管理器SecurityManager,5.2节对SecurityManager有详细介绍。
  • sslOptions:使用SecurityManager获取spark.ssl.ui属性指定的WebUI的SSL(Secure Sockets Layer 安全套接层)选项。
  • port:WebUI对外服务的端口。可以使用spark.ui.port属性进行配置。
  • conf:即SparkConf。
  • basePath:WebUI的基本路径。basePath默认为空字符串。
  • name:WebUI的名称。Spark UI的name为SparkUI。
  • tabs:WebUITab的缓冲数组。
  • handlers:ServletContextHandler的缓冲数组。ServletContextHandler是Jetty提供的API,负责对ServletContext进行处理。ServletContextHandler的使用及Jetty的更多内容可以参阅附录C。
  • pageToHandlers:WebUIPage与ServletContextHandler缓冲数组之间的映射关系。由于WebUIPage的两个方法render和renderJson分别需要由一个对应的ServletContextHandler处理。所以一个WebUIPage对应两个ServletContextHandler。
  • serverInfo:用于缓存ServerInfo,即WebUI的Jetty服务器信息。
  • publicHostName:当前WebUI的Jetty服务的主机名。优先采用系统环境变量SPARK_PUBLIC_DNS指定的主机名,否则采用spark.driver.host属性指定的host,在没有前两个配置的时候,将默认使用工具类Utils的localHostName方法(详见附录A)返回的主机名。
  • className:过滤了$符号的当前类的简单名称。className 是通过Utils的getFormattedClassName方法得到的。getFormattedClassName方法的实现请看附录A。

了解了WebUI的成员属性,现在就可以理解其提供的各个方法了。WebUI提供的方法有:

  • getBasePath:获取basePath。
  • getTabs:获取tabs中的所有WebUITab,并以Scala的序列返回。
  • getHandlers:获取handlers中的所有ServletContextHandler,并以Scala的序列返回。
  • getSecurityManager:获取securityManager。
  • attachHandler:给handlers缓存数组中添加ServletContextHandler,并且将此ServletContextHandler通过ServerInfo的addHandler方法添加到Jetty服务器中。attachHandler的实现见代码清单3。ServerInfo的addHandler方法的请参阅附录C。

代码清单3  attachHandler的实现

  def attachHandler(handler: ServletContextHandler) {
handlers += handler
serverInfo.foreach(_.addHandler(handler))
}
  • detachHandler:从handlers缓存数组中移除ServletContextHandler,并且将此ServletContextHandler通过ServerInfo的removeHandler方法从Jetty服务器中移除。detachHandler的实现见代码清单4。ServerInfo的removeHandler方法的请参阅附录C。

代码清单4  detachHandler的实现

  def detachHandler(handler: ServletContextHandler) {
handlers -= handler
serverInfo.foreach(_.removeHandler(handler))
}
  • attachPage:首先调用工具类JettyUtils[1]的createServletHandler方法给WebUIPage创建与render和renderJson两个方法分别关联的ServletContextHandler,然后通过attachHandler方法添加到handlers缓存数组与Jetty服务器中,最后把WebUIPage与这两个ServletContextHandler的映射关系更新到pageToHandlers中。attachPage的实现见代码清单5。

代码清单5  attachPage的实现

  def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
attachHandler(renderHandler)
attachHandler(renderJsonHandler)
val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
handlers += renderHandler
}
  • detachPage:作用与attachPage相反。detachPage的实现见代码清单6。

代码清单6  detachPage的实现

  def detachPage(page: WebUIPage) {
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
}
  • attachTab:首先向tabs中添加WebUITab,然后给WebUITab中的每个WebUIPage施加attachPage方法。attachTab的实现见代码清单7。

代码清单7  attachTab的实现

  def attachTab(tab: WebUITab) {
tab.pages.foreach(attachPage)
tabs += tab
}
  • detachTab:作用与attachTab相反。detachTab的实现见代码清单8。

代码清单8  detachTab的实现

  def detachTab(tab: WebUITab) {
tab.pages.foreach(detachPage)
tabs -= tab
}
  • addStaticHandler:首先调用工具类JettyUtils的createStaticHandler方法创建静态文件服务的ServletContextHandler,然后施加attachHandler方法。addStaticHandler的实现见代码清单9。JettyUtils的createStaticHandler方法的实现见附录C。

代码清单9     addStaticHandler的实现

  def addStaticHandler(resourceBase: String, path: String): Unit = {
attachHandler(JettyUtils.createStaticHandler(resourceBase, path))
}
  • removeStaticHandler:作用与addStaticHandler相反。removeStaticHandler的实现见代码清单10。

代码清单10         removeStaticHandler的实现

  def removeStaticHandler(path: String): Unit = {
handlers.find(_.getContextPath() == path).foreach(detachHandler)
}
  • initialize:用于初始化WebUI服务中的所有组件。WebUI中此方法未实现,需要子类实现。
  • bind:启动与WebUI绑定的Jetty服务。bind方法的实现见代码清单11。

代码清单11         bind的实现

  def bind() {
assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!")
try {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name))
logInfo(s"Bound $className to $host, and started at $webUrl")
} catch {
case e: Exception =>
logError(s"Failed to bind $className", e)
System.exit(1)
}
}
  • webUrl:获取WebUI的Web界面的URL。webUrl的实现如下:
  def webUrl: String = shttp://$publicHostName:$boundPort
  • boundPort:获取WebUI的Jetty服务的端口。boundPort的实现如下:
  def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
  • stop:停止WebUI。实际是停止WebUI底层的Jetty服务。stop方法的实现见代码清单12。

代码清单12         stop方法的实现

  def stop() {
assert(serverInfo.isDefined,
s"Attempted to stop $className before binding to a server!")
serverInfo.get.stop()
}

创建SparkUI

  在SparkContext的初始化过程中,会创建SparkUI。有了对WebUI的总体认识,现在是时候了解SparkContext是如何构造SparkUI的了。SparkUI是WebUI框架的使用范例,了解了SparkUI的创建过程,读者对MasterWebUI、WorkerWebUI及HistoryServer的创建过程也必然了然于心。创建SparkUI的代码如下:

    _statusTracker = new SparkStatusTracker(this)

    _progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
} _ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind())

这段代码的执行步骤如下。

1)  创建Spark状态跟踪器SparkStatusTracker。

2)  创建ConsoleProgressBar。可以配置spark.ui.showConsoleProgress属性为false取消对ConsoleProgressBar的创建,此属性默认为true。

3)  调用SparkUI的createLiveUI方法创建SparkUI。

4)  给SparkUI绑定端口。SparkUI继承自WebUI,因此调用了代码清单4-12中WebUI的bind方法启动SparkUI底层的Jetty服务。

上述步骤中,第1)、2)、4)步都很简单,所以着重来分析第3)步。SparkUI的createLiveUI的实现如下。

  def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener), startTime = startTime)
}

可以看到SparkUI的createLiveUI方法中调用了create方法。create的实现如下。

  private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
} val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener(conf)
val executorsListener = new ExecutorsListener(storageStatusListener, conf)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
appName, basePath, startTime)
}

可以看到create方法里除了JobProgressListener是外部传入的之外,又增加了一些SparkListener,例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展示在BlockManagerUI的StorageListener;用于构建RDD的DAG(有向无关图)的RDDOperationGraphListener等。这5个SparkListener的实现添加到listenerBus的监听器列表中。最后使用SparkUI的构造器创建SparkUI。

SparkUI的初始化

  调用SparkUI的构造器创建SparkUI,实际也是对SparkUI的初始化过程。在介绍初始化之前,先来看看SparkUI中的两个成员属性。

  • killEnabled:标记当前SparkUI能否提供杀死Stage或者Job的链接。
  • appId:当前应用的ID。

SparkUI的构造过程中会执行initialize方法,其实现见代码清单13。

代码清单13         SparkUI的初始化

  def initialize() {
val jobsTab = new JobsTab(this)
attachTab(jobsTab)
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
httpMethods = Set("GET", "POST")))
}
initialize()

根据代码清单13,SparkUI的初始化步骤如下。

1)  构建页面布局并给每个WebUITab中的所有WebUIPage创建对应的ServletContextHandler。这一步使用了代码清单4-8中展示的attachTab方法。

2)  调用JettyUtils的createStaticHandler方法创建对静态目录org/apache/spark/ui/static提供文件服务的ServletContextHandler,并使用attachHandler方法追加到SparkUI的服务中。

3)  调用JettyUtils的createRedirectHandler方法创建几个将用户对源路径的请求重定向到目标路径的ServletContextHandler。例如,将用户对根路径"/"的请求重定向到目标路径"/jobs/"的ServletContextHandler。

SparkUI的页面布局与展示

  SparkUI究竟是如何实现页面布局及展示的? 由于所有标签页都继承了SparkUITab,所以我们先来看看SparkUITab的实现:

private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
extends WebUITab(parent, prefix) {
def appName: String = parent.getAppName
}

根据上述代码,我们知道SparkUITab继承了WebUITab,并在实现中增加了一个用于获取当前应用名称的方法appName。EnvironmentTab是用于展示JVM、Spark属性、系统属性、类路径等相关信息的标签页,由于其实现简单且能说明问题,所以本节挑选EnvironmentTab作为示例解答本节一开始提出的问题。

EnvironmentTab的实现见代码清单14。

代码清单14         EnvironmentTab的实现

private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = parent.environmentListener
attachPage(new EnvironmentPage(this))
}

根据代码清单14,我们知道EnvironmentTab引用了SparkUI的environmentListener(类型为EnvironmentListener),并且包含EnvironmentPage这个页面。EnvironmentTab通过调用attachPage方法将EnvironmentPage与Jetty服务关联起来。根据代码清单5中attachPage的实现,创建的renderHandler将采用偏函数(request: HttpServletRequest) => page.render(request) 处理请求,因而会调用EnvironmentPage的render方法。EnvironmentPage的render方法将会渲染页面元素。EnvironmentPage的实现见代码清单15。

代码清单15         EnvironmentPage的实现

private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") {
private val listener = parent.listener private def removePass(kv: (String, String)): (String, String) = {
if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) {
(kv._1, "******")
} else kv
} def render(request: HttpServletRequest): Seq[Node] = {
// 调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true)
val content =
<span>
<h4>Runtime Information</h4> {runtimeInformationTable}
<h4>Spark Properties</h4> {sparkPropertiesTable}
<h4>System Properties</h4> {systemPropertiesTable}
<h4>Classpath Entries</h4> {classpathEntriesTable}
</span>
// 调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等
UIUtils.headerSparkPage("Environment", content, parent)
}
// 定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部
// classPathHeaders
private def propertyHeader = Seq("Name", "Value")
private def classPathHeaders = Seq("Resource", "Source")
// 定义JVM运行时信息的表格中每行数据的生成方法jvmRow
private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
}

根据代码清单15,EnvironmentPage的render方法利用从父节点EnvironmentTab中得到的EnvironmentListener中的统计监控数据生成JVM运行时、Spark属性、系统属性以及类路径等状态的摘要信息。以JVM运行时为例,页面渲染的步骤如下:

1)  定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部classPathHeaders。

2)  定义JVM运行时信息的表格中每行数据的生成方法jvmRow。

3)  调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格。

4)  调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等。

UIUtils工具类的实现细节留给感兴趣的读者自行查阅,本文不多赘述。


[1]本节内容用到JettyUtils中的很多方法,读者可以在附录C中找到相应的实现与说明。

关于《Spark内核设计的艺术 架构设计与实现》

经过近一年的准备,基于Spark2.1.0版本的《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:

Spark2.1.0——内置Web框架详解-LMLPHP

纸质版售卖链接如下:

京东:https://item.jd.com/12302500.html

04-14 11:32