SparkEnv在两个地方会被创建, 由于SparkEnv中包含了很多重要的模块, 比如BlockManager, 所以SparkEnv很重要
Driver端, 在SparkContext初始化的时候, SparkEnv会被创建
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>", // 表示是driver, 下面的executor则是executorid
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port").toInt,
true,
isLocal)
SparkEnv.set(env)
Executor端, 在executor初始化时被创建
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
SparkEnv Class
用于hold所有Spark运行时的环境对象, serializer, Akka actor system, block manager, and map output tracker等
/**
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
val mapOutputTracker: MapOutputTracker,
val shuffleFetcher: ShuffleFetcher,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem) {
}
SparkEnv Object
scala使用伴生object当作类接口
除了基本的get和set
就是在createFromSystemProperties中创建了一堆很关键的对象
object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv] // ThreadLocal,所以每个线程各访问各的
@volatile private var lastSetSparkEnv : SparkEnv = _ // 缓存最新更新的SparkEnv,并且volatile,便于其他线程获得 def set(e: SparkEnv) {
lastSetSparkEnv = e
env.set(e)
} /**
* Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
* previously set in any thread.
*/
def get: SparkEnv = {
Option(env.get()).getOrElse(lastSetSparkEnv) // 没有local时, 可以用lastSetSparkEnv
} /**
* Returns the ThreadLocal SparkEnv.
*/
def getThreadLocal : SparkEnv = {
env.get() // 只取到local的
} def createFromSystemProperties(
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) val classLoader = Thread.currentThread.getContextClassLoader // Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
val name = System.getProperty(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
} val serializerManager = new SerializerManager val serializer = serializerManager.setDefault(
System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer")) val closureSerializer = serializerManager.get(
System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")) val connectionManager = blockManager.connectionManager val broadcastManager = new BroadcastManager(isDriver) val cacheManager = new CacheManager(blockManager)
// BlockManager
val blockManagerMaster = new BlockManagerMaster(registerOrLookup( // registerOrLookup表示只有在master上创建Actor对象, slave上只是创建ref
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal)))
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
// MapOutputTracker
val mapOutputTracker = new MapOutputTracker()
mapOutputTracker.trackerActor = registerOrLookup( // 同样只有在master创建actor对象
"MapOutputTracker",
new MapOutputTrackerActor(mapOutputTracker))
// ShuffleFetcher
val shuffleFetcher = instantiateClass[ShuffleFetcher](
"spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri) val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver")
} else {
MetricsSystem.createMetricsSystem("executor")
}
metricsSystem.start() new SparkEnv(
executorId,
actorSystem,
serializerManager,
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleFetcher,
broadcastManager,
blockManager,
connectionManager,
httpFileServer,
sparkFilesDir,
metricsSystem)
}
}