RDDs弹性分布式数据集

spark就是实现了RDDs编程模型的集群计算平台。有很多RDDs的介绍,这里就不仔细说了,这儿主要看源码。

abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

SparkEnv几个重要组件

BlockManager

主要成员

  private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
  // Actual storage of where blocks are kept
  private var externalBlockStoreInitialized = false
  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
  private[spark] lazy val externalBlockStore: ExternalBlockStore = {
    externalBlockStoreInitialized = true
    new ExternalBlockStore(this, executorId)
  }

主要方法

get(blockId: BlockId) 通过BlockId找Block

  /**
* Get a block from the block manager (either local or remote).
*/
def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}

getLocal(blockId: BlockId): Option[BlockResult] 通过本地Block Manager找Block

  /**
* Get block from local block manager.
*/
def getLocal(blockId: BlockId): Option[BlockResult] = {
logDebug(s"Getting local block $blockId")
doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] 在本地获取Block代码实现

  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
// Double check to make sure the block is still there. There is a small chance that the
// block has been removed by removeBlock (which also synchronizes on the blockInfo object).
// Note that this only checks metadata tracking. If user intentionally deleted the block
// on disk or from off heap storage without using removeBlock, this conditional check will
// still pass but eventually we will get an exception because we can't find the block.
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
} // If another thread is writing the block, wait for it to become ready.
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
} val level = info.level
logDebug(s"Level for block $blockId is $level") // Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
} // Look for the block in external block store
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
} // Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position()) if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
// The put may or may not have succeeded, depending on whether there was enough
// space to unroll the block. Either way, the put here should return an iterator.
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
// This only happens if we dropped the values back to disk (which is never)
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}

相关类

Dependency

宽依赖和窄依赖两种。Denpendency类中主要保存父RDD,根据partition id获得所依赖的父RDD partitions列表。ShuffleDependency还保存了shuffle需要的必要组件,shuffle output partitioner、serializer、keyOrdering、aggregator等

abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
} /**
* :: DeveloperApi ::
* Base class for dependencies where each partition of the child RDD depends on a small number
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd
} /**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
} /**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
} /**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}

RDDBlockId

一个RDD partition用一个RDDBlock存储。

/**
* :: DeveloperApi ::
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
@DeveloperApi
sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String // convenience methods
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] override def toString: String = name
override def hashCode: Int = name.hashCode
override def equals(other: Any): Boolean = other match {
case o: BlockId => getClass == o.getClass && name.equals(o.name)
case _ => false
}
} @DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}

BlockInfo

private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
// To save space, 'pending' and 'failed' are encoded as special sizes:
@volatile var size: Long = BlockInfo.BLOCK_PENDING
private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this) setInitThread() private def setInitThread() {
/* Set current thread as init thread - waitForReady will not block this thread
* (in case there is non trivial initialization which ends up calling waitForReady
* as part of initialization itself) */
BlockInfo.blockInfoInitThreads.put(this, Thread.currentThread())
} /**
* Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
* Return true if the block is available, false otherwise.
*/
def waitForReady(): Boolean = {
if (pending && initThread != Thread.currentThread()) {
synchronized {
while (pending) {
this.wait()
}
}
}
!failed
} /** Mark this BlockInfo as ready (i.e. block is finished writing) */
def markReady(sizeInBytes: Long) {
require(sizeInBytes >= 0, s"sizeInBytes was negative: $sizeInBytes")
assert(pending)
size = sizeInBytes
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
}
} /** Mark this BlockInfo as ready but failed */
def markFailure() {
assert(pending)
size = BlockInfo.BLOCK_FAILED
BlockInfo.blockInfoInitThreads.remove(this)
synchronized {
this.notifyAll()
}
}
} private object BlockInfo {
/* initThread is logically a BlockInfo field, but we store it here because
* it's only needed while this block is in the 'pending' state and we want
* to minimize BlockInfo's memory footprint. */
private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread] private val BLOCK_PENDING: Long = -1L
private val BLOCK_FAILED: Long = -2L
}

主要成员

/** sparkContext */
def sparkContext: SparkContext = sc
/** 唯一标识的RDD id */
val id: Int = sc.newRddId()
/** name */
@transient var name: String = null
/** 依赖列表 */
protected def getDependencies: Seq[Dependency[_]] = deps
/** 分区列表 */
protected def getPartitions: Array[Partition]
/** 子类可选重写,优先存储的位置 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** 分区器 */
@transient val partitioner: Option[Partitioner] = None
/** 指定怎么计算的到RDD的分区 */
def compute(split: Partition, context: TaskContext): Iterator[T]

主要方法

persist实现

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
} private[spark] def persistRDD(rdd: RDD[_]) {
persistentRdds(rdd.id) = rdd
}

获取数据iterator方法

  /**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

getOrCompute

  /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes) val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
} // Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context) // If the task is running locally, do not persist the result
if (context.isRunningLocally) {
return computedValues
} // Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues) } finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
05-11 17:03