MemeoryStore

上一节,我们对BlockManager的主要写入方法做了一个整理,知道了BlockMananger的主要写入逻辑,以及对于块信息的管理。但是,由于spark的整个存储模块是在是很庞大,而且很多细节的逻辑错综复杂,如果对于每个细节都刨根问底,一来精力有限,二来感觉也没有太大的必要,当然如果时间允许肯定是越详细越好,在这里,我的分析的主要目的是理清存储模块的重点逻辑,希望能够提纲契领地把各个模块的脉络领出来,建立起对spark-core中各模块的整体认知,这样我们在遇到一些问题的时候就能够很快地知道应该从何处下手,从哪个具体的模块去找问题。
好了废话不多说,本节接着上一节。上一篇,我们分析了BlockManager的几个主要的存储方法,发现BlockManager主要依靠内部的两个组件MemoryStore和DiskStore来进行实际的数据写入和块的管理。
本节,我们就来看一下MemoryStore这个组件。

不过,我还是延续我一贯的风格,从外部对一个类的方法调用为切入点分析这个类的作用和逻辑。
所以,我们先来看一下上一节对于MemoryStore的主要的方法调用的总结:

memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes

memoryStore.putIteratorAsValues

这个方法主要是用于存储级别是非序列化的情况,即直接以java对象的形式将数据存放在jvm堆内存上。我们都知道,在jvm堆内存上存放大量的对象并不是什么好事,gc压力大,挤占内存,可能引起频繁的gc,但是也有明显的好处,就是省去了序列化和反序列化耗时,而且直接从堆内存取数据显然比任何其他方式(磁盘和直接内存)都要快很多,所以对于内存充足且要缓存的数据量本省不是很大的情况,这种方式也不失为一种不错的选择。

private[storage] def putIteratorAsValues[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

// 用于存储java对象的容器
val valuesHolder = new DeserializedValuesHolder[T](classTag)

putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
    // 存储成功
  case Right(storedSize) => Right(storedSize)
    // 存储失败的情况
  case Left(unrollMemoryUsedByThisBlock) =>
    // ValuesHolder内部的数组和vector会相互转换
    // 数据写入完成后会将vector中的数据转移到数组中
    val unrolledIterator = if (valuesHolder.vector != null) {
      valuesHolder.vector.iterator
    } else {
      valuesHolder.arrayValues.toIterator
    }

    // 返回写入一半的迭代器、
    // 外部调用者一半会选择关闭这个迭代器以释放被使用的内存
    Left(new PartiallyUnrolledIterator(
      this,
      MemoryMode.ON_HEAP,
      unrollMemoryUsedByThisBlock,
      unrolled = unrolledIterator,
      rest = values))
}
}

这个方法的逻辑很简单,作用也比较单一,主要是对实际存储方法putIterator的返回结果做处理,如果失败的话,就封装一个PartiallyUnrolledIterator返回给外部调用这个,调用这个一般需要将这个写入一半的迭代器关闭。

MemoryStore.putIterator

这个方法看似很长,其实逻辑相对简单,主要做的事就是把数据一条一条往ValuesHolder中写,并周期性地检查内存,如果内存不够就通过内存管理器MemoryManager申请内存,每次申请当前内存量的1.5倍。
最后,将ValuesHolder中的数据转移到一个数组中(其实数据在SizeTrackingVector中也是以数组的形式存储,只不过SizeTrackingVector对象内部处理数组还有一些其他的簿记量,更为关键的是我们需要将存储的数据以同一的接口进行包装,以利于MemoryStore进行同一管理)。最后还有关键的一步,就是释放展开内存,重新申请存储内存。
此外,这个过程中有使用到memoryManager,具体的方法调用是:

memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)

------------------------------分割线------------------------------

private def putIterator[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode,
  valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用于数据在内存展开的初始的内存使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 检查内存的频率,每写这么多条数据就会检查一次是否需要申请额外的内存
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 内存阈值,开始时等于初始阈值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 内存增长因子,每次申请的内存是当前内存的这个倍数
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 当前的块使用的内存大小
var unrollMemoryUsedByThisBlock = 0L

// Request enough memory to begin unrolling
// 首先进行初始的内存申请,向MemoryManager申请内存
keepUnrolling =
  reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

if (!keepUnrolling) {
  logWarning(s"Failed to reserve initial memory threshold of " +
    s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
  // 如果成功申请到内存,则累加记录
  unrollMemoryUsedByThisBlock += initialMemoryThreshold
}

// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 循环将每条数据写入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
  valuesHolder.storeValue(values.next())
  // 如果写入数据的条数达到一个周期,那么就检查一下是否需要申请额外的内存
  if (elementsUnrolled % memoryCheckPeriod == 0) {
    // 通过valuesHolder获取已经写入的数据的评估大小
    // 注意,这里的数据大小只是估计值,并不是十分准确
    // 具体如何进行估算的可以看valuesHolder内部实现
    val currentSize = valuesHolder.estimatedSize()
    // If our vector's size has exceeded the threshold, request more memory
    // 如果已写入的数据大小超过了当前阈值
    if (currentSize >= memoryThreshold) {
      // 这里每次申请的内存量都是不一样的
      // 每次申请的内存是当前已使用内存的1.5倍(默认)
      val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
      keepUnrolling =
        reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
      if (keepUnrolling) {
        // 记录累积申请的内存量
        unrollMemoryUsedByThisBlock += amountToRequest
      }
      // New threshold is currentSize * memoryGrowthFactor
      // 目前已经向内存管理器申请的内存量
      memoryThreshold += amountToRequest
    }
  }
  // 记录插入的数据条数
  elementsUnrolled += 1
}

// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 如果keepUnrolling为true,说明顺利地将所有数据插入,
// 并未遇到申请内存失败的情况
if (keepUnrolling) {
  // 将内部的数据转移到一个数组中
  val entryBuilder = valuesHolder.getBuilder()
  // 数据在内存中的精确大小
  val size = entryBuilder.preciseSize
  // 实际的大小可能大于申请的内存量
  // 因此根据实际大小还要再申请额外的内存
  if (size > unrollMemoryUsedByThisBlock) {
    val amountToRequest = size - unrollMemoryUsedByThisBlock
    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
    if (keepUnrolling) {
      unrollMemoryUsedByThisBlock += amountToRequest
    }
  }

  if (keepUnrolling) {
    // 获取MemoryEntry对象,该对象是对插入数据的包装
    val entry = entryBuilder.build()
    // Synchronize so that transfer is atomic
    memoryManager.synchronized {
      // 这一步主要是释放申请的展开内存
      // 然后申请存储内存
      // 这里需要弄清楚展开内存的概念
      // 展开状态指的是对象在内存中处于一种比较松散的状态,这样的状态方便做一些管理如统计大小等
      // 而随后将对象转移到数组中,处于一种比较紧实的状态,数组相对来说占用的额外内存是比较小的
      // 一个数组只是一个对象,只有一个对象头,可以用来管理大量的对象
      releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
      // 申请存储内存
      val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
      assert(success, "transferring unroll memory to storage memory failed")
    }

    // 放入map中管理起来
    entries.synchronized {
      entries.put(blockId, entry)
    }

    logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
      Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
    Right(entry.size)
  } else {
    // We ran out of space while unrolling the values for this block
    logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
    // 如果失败,返回已经申请的展开内存
    Left(unrollMemoryUsedByThisBlock)
  }
} else {
  // We ran out of space while unrolling the values for this block
  logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
  Left(unrollMemoryUsedByThisBlock)
}
}

memoryStore.putIteratorAsBytes

我们再看另一个方法。套路基本和putIteratorAsValues是一样一样的。
最大的区别在于ValuesHolder类型不同。非序列化形式存储使用的是DeserializedMemoryEntry,而序列化形式存储使用的是SerializedMemoryEntry。

private[storage] def putIteratorAsBytes[T](
  blockId: BlockId,
  values: Iterator[T],
  classTag: ClassTag[T],
  memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {

require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 字节数组的块大小,默认是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
  logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
    s"is too large to be set as chunk size. Chunk size has been capped to " +
    s"${Utils.bytesToString(Int.MaxValue)}")
  Int.MaxValue
} else {
  initialMemoryThreshold.toInt
}

// 字节数组的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
  memoryMode, serializerManager)

putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
  case Right(storedSize) => Right(storedSize)
  case Left(unrollMemoryUsedByThisBlock) =>
    // 部分展开,部分以序列化形式存储的block
    Left(new PartiallySerializedBlock(
      this,
      serializerManager,
      blockId,
      valuesHolder.serializationStream,
      valuesHolder.redirectableStream,
      unrollMemoryUsedByThisBlock,
      memoryMode,
      valuesHolder.bbos,
      values,
      classTag))
}
}

memoryStore.putBytes

我们再来看另一个被外部调用用来插入数据的方法。很简单,不说了。

def putBytes[T: ClassTag](
  blockId: BlockId,
  size: Long,
  memoryMode: MemoryMode,
  _bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向内存管理器申请内存
// 这里申请的是存储内存,因为要插入的字节数组,
// 所以不需要再展开,也就不需要申请展开内存
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
  // We acquired enough memory for the block, so go ahead and put it
  val bytes = _bytes()
  assert(bytes.size == size)
  // 这里直接构建了一个SerializedMemoryEntry
  // 并放到map中管理起来
  val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
  entries.synchronized {
    entries.put(blockId, entry)
  }
  logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
    blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
  true
} else {
  false
}
}

小结

通过对上面的三个方法,其实主要是前两个方法的分析,我们发现,除了对内存进行簿记管理之外,以及通过内存管理器申请内存之外,插入数据最主要的工作其实都是有ValuesHolder对象来完成的。
ValuesHolder特质有两个实现类:DeserializedValuesHolder和SerializedValuesHolder。

DeserializedValuesHolder

DeserializedValuesHolder对象内部有两个成员:vector,是一个SizeTrackingVector;arrayValues,是一个存放值的数组,用于在所有数据插入后,将主句转移到一个数组中,方便包装成一个MemoryEntry对象。大部分工作是有SizeTrackingVector完成的。

private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
  // Underlying vector for unrolling the block
  var vector = new SizeTrackingVector[T]()(classTag)
  var arrayValues: Array[T] = null

  override def storeValue(value: T): Unit = {
    vector += value
  }

  override def estimatedSize(): Long = {
    vector.estimateSize()
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    arrayValues = vector.toArray
    vector = null

    override val preciseSize: Long = SizeEstimator.estimate(arrayValues)

    override def build(): MemoryEntry[T] =
      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
  }
}

SizeTracker

上面提到的SizeTrackingVector继承了这个特质,除了这个特质,还集成了PrimitiveVector类,但是PrimitiveVector类基本上就是对一个数组的简单包装。
SizeTrackingVector最重要的功能:追踪对象的大小,就是在SizeTracker特之中实现的。

我大致说一下这个特质是如何实现对象大小跟踪和估算的,代码实现也并不复杂,感兴趣的可以看一看,限于篇幅这里就不贴了。

  • 每插入一定数量的数据(姑且称之为周期),就会对当前的对象进行一次取样,而这个取样的周期会越来越长,以1.1倍的速率增长;
  • 取样就是计算对象大小,并与前一次取样作比较,而且只会保留最近两次的取样数据;
  • 每次取样其实就是获取两个数据,当前对象大小,当前插入的数据条数;
  • 这样与上一次取样一比较,就能够计算出每条数据的大小了;
  • 最后,在返回整个对象大小时,是拿最近一次取样时记录下的对象大小,以及根据最近的情况估算的每条数据的大小乘以自从上次取样以来新插入的数据量,二者相加作为对象大小的估算值,

可见这么做并不是什么精确,但是由于是抽样,而且抽样周期越往后面越长,所以对于数据插入的效率影响很小,而且这种不精确性其实在后续的内存检查过程中是有考虑到的。在所有数据插入完的收尾工作中,会对对象大小做一次精确计算。此外,熟悉spark内存管理的同学应该知道,其实spark一般会配置一个安全因子(一般是0.9),也就是说只是用配置的内存大小的90%,就是为了尽可能地减少这种不精确的内存估算造成OOM的可能性。

SerializedValuesHolder

private class SerializedValuesHolder[T](
    blockId: BlockId,
    chunkSize: Int,
    classTag: ClassTag[T],
    memoryMode: MemoryMode,
    serializerManager: SerializerManager) extends ValuesHolder[T] {
  val allocator = memoryMode match {
    case MemoryMode.ON_HEAP => ByteBuffer.allocate _
      // 调用unsafe的本地方法申请直接内存
      // 这个方法之所以没有调用ByteBuffer.allocateDirect方法
      // 是因为这个方法分配的直接内存大小收到参数MaxDirectMemorySize限制
      // 所以这里绕过ByteBuffer.allocateDirect方法,通过反射和unsafe类创建直接内存对象
    case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
  }

  val redirectableStream = new RedirectableOutputStream
  val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
  redirectableStream.setOutputStream(bbos)
  val serializationStream: SerializationStream = {
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
    // 包装压缩流和序列化流
    ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
  }

  // 写入方法,写入的对象经过序列化,压缩,
  // 然后经过ChunkedByteBufferOutputStream被分割成一个个的字节数组块
  override def storeValue(value: T): Unit = {
    serializationStream.writeObject(value)(classTag)
  }

  override def estimatedSize(): Long = {
    bbos.size
  }

  override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
    // We successfully unrolled the entirety of this block
    serializationStream.close()

    override def preciseSize(): Long = bbos.size

    override def build(): MemoryEntry[T] =
      SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
  }
}

大概看一下,主要的逻辑很简单,这里面有几个注意点:

  • 对于直接内存分配,spark并没有使用jdk的高级api,而是反射配合unsafe类分配直接内存,这样可以绕过jvm参数MaxDirectMemorySize的限制,这也体现了spark的作者尽可能的降低用户使用难度
  • 另外,我们看到序列化流其实经过了层层包装(典型的装饰器模式),序列化和压缩以及分块是比较重要的几个点,感兴趣的话可以深究,序列化和压缩如果深入了解都是很大的课题,所以这里也仅仅是蜻蜓点水,不深究了。

总结

MemoryStore.scala这个文件中乍看代码有八百多行,但是其实很大部分代码是一些辅助类,比较核心的写入逻辑也就是前面提到的几个方法,再加上核心的两个类DeserializedValuesHolder和SerializedValuesHolder实现了以对象或字节数组的形式存储数据。

06-12 20:56