依据Spark 1.4版

序列化和反序列化

前边提到,TorrentBroadcast的关键就在于特殊的序列化和反序列化设置。1.1版的TorrentBroadcast实现了自己的readObject和writeObject方法,但是1.4.1版的TorrentBroadcast没有实现自己的readObject方法,那么它是如何进行序列化和反序列化的呢?

// obj就是被广播的对象
private val numBlocks: Int = writeBlocks(obj) override protected def getValue() = {
_value
} @transient private lazy val _value: T = readBroadcastBlock()

可以认为TorrentBroadcast对象经过了三个主要阶段的处理:构造器,序列化,反序列化

构造器

在构造TorrentBroadcast对象时,numBlocks会被初始化,此时writeBlocks会被执行。writeBlocks会执行把obj序列化,分块,存储进BlockManager等操作。

而_value域是lazy的,因此在TorrentBroadcast对象初始化时,_value不会初始化,readBroadcastBlock也不会执行。

序列化

当在driver端对RDD调用一个action时,会生成Task对象,Task对象引用到的对象会被序列化,然后对每一个task,反序列化一个Task对象。

TorrentBroadcast需要保证被广播的对象不会随Task一起序列化。需要注意以下两点:

private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
  ……
}
@transient private lazy val _value: T = readBroadcastBlock()

Scala的构造函数里的参数并不一定会成为对象的字段,像obj这种只是用来构造对象、没有被用于实现方法的构造器参数,不会成为TorrentBroadcast的字段,因此不会被序列化。

而_value尽管引用了被广播的数据,但它是@transient的,因此也不会被序列化。

反序列化

反序列化的关键在于,_value不会被反序列化。因此,如果某个executor没有task使用TorrentBroadcast的value方法,被广播的数据就不会被在这个executor端获取。

实现这种功能的关键在于Scala的lazy val。

首先,考虑这个问题:lazy val可能被多个线程同时访问,这会触发lazy val的初始化,但是需要保证这个初始化的过程就线程安全的,即lazy val只被初始化一次,且初始化的结果对所有线程可见。实现这种行为,最简单的做为是使用this做同步,但是这样的效率会很低,而Scala实现lazy val使用了一种效率更高的方法,但不管怎么做,lazy val比普通的val的访问效率会降低。

举一个Double-checked locking idiom, sweet in Scala!中的例子:

lazy val myLazyField = create();

会被编译成:

   public volatile int bitmap$0;
private Object myLazyField; public String myLazyField() {
if((bitmap$0 & 1) == 0)
{
synchronized(this)
{
if((bitmap$0 & 1) == 0)
{
myLazyField = ...
bitmap$0 = bitmap$0 | 1;
}
}
}
return myLazyField;
}

即通过一个volatile变量来判断这个lazy val是否已经初始化,通过双重检查加锁来做初始化。

现在有了新的问题:

1. 默认的序列化过程是否会触发lazy val被初始化呢?

2. 如果在TorrentBroadcast对象被序列化之前,lazy val被访问,触发了初始化过程,那么被广播的数据相关于作为TorrentBroadcast的一个field,也会被序列化。

问题1的答案是不会触发。问题2的答案_value需要被注明是transient,就像TorrentBroadcast里所做的一样。

所以,在函数中如果经常使用Broadcast.value方法返回的对象时,比如在循环中使用它,最后先在循环外创建一个对这个对象的引用,以减少一些开销。

但是,lazy val的这种线程安全机制对于TorrentBroadcast是浪费的。因为Broadcast变量是随Task一起序列化的,每个线程有自己的Task对象,也就是线程间不共享Broadcast对象。实际上,为了保证同一个JVM上运行的不同task得到同样的被广播的对象,readBroadcastBlock方法是使用TorrentBroadcast这个class做了同步,

下面来看一下把被广播的对象分块存储的过程

将广播的对象分块存储

这一步是在TorrentBroadcast对象初始化时候做的。

val numBlocks: Int = writeBlocks(obj)

触发。下面看一下writeBlocks方法

writeBlocks

  private def writeBlocks(value: T): Int = {
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
//blocks的类型是Array[ByteBuffer]
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),//以BroadcastBlockId为BlockId存储
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}
blocks.length
}

正如代码中的注释所说的,writeBlocks会首先把被广播的对象用putSingle方法放在driver的BlockManager里,这是为了当在driver运行task时,不会额外创建一个被广播的对象的副本。若没有这一步,在driver端运行task时,会和executor端一样,通过Broadcast对象的value方法新建一个被广播的对象,这就使得driver端有两份这个对象。但实际上driver端运行task的情况并不常见。所以这里最好根据conf判断下是否有必要这么做。

接下来,使用伴生对象的blockifyObject方法把对象分块,得到的结果是一个ByteBuffer的数组。然后把这些块存进BlockManager, 这里有两点需要注意:

1. 把块存进BlockManager时,使用的id是BroadcastBlockId(id, "piece" + i)。也就是说跟据Broadcast对象的id,以及总共的块的数量就可以还原出所有的块存储时所使用的id。这也就是为什么TorrentBroadcast要有numBlocks这个field的原因。而id字段是Broadcast这个虚类里的val, 所以根据TorrentBroadcast对象的字段,即可以它所划分的所有block的id。在从这些块还原被broadcast的对象时,也的确是这么做的。

2. 把划分出的块存储进BlockManager时,tellMaster字段的值为true,这就使得master可以知道哪个BlockManager存储了这个块,因此executor端的BlockManager最初的时候才能从driver端的BlockManager获取这个块。相反的是,writeBlocks第一句putSingle时,tellMaster是false,因为并不准备让其它BlockManager获取putSingle进去的对象。

blockifyObject

blockifyObject作的工作就是将被广播的对象序列化,如果启用了压缩就进行压缩,然后将得到的字节流写入到一系列字节数组中。

它的返回值类型为:Array[ByteBuffer], 之所有是ByteBuffer, 是为了BlockManager使用方便,因为BlockManager的putBytes方法接受ByteBuffer作为参数。

  def blockifyObject[T: ClassTag](
obj: T,
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
}

它实现的关键在于ByteArrayChunkOutputStream, 这个类实现了Java的OutputStream接口。它的主体部分如下:

private[spark] class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {

  private val chunks = new ArrayBuffer[Array[Byte]]
private var lastChunkIndex = -1
private var position = chunkSize
override def write(b: Int): Unit = {
allocateNewChunkIfNeeded()
chunks(lastChunkIndex)(position) = b.toByte
position += 1
}  override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { ... }
 def toArrays: Array[Array[Byte]] = { ... }
 ...

}

即,它在内部使用一些长度等于chunkSize的数组来存储被写入的字节。

组装还原被广播的对象

在executor端(如果有task在driver执行的话,也可以是在driver端)需要把被切块后的对象组装起来,还原成被广播的对象。这是通过对lazy val _value访问触发的。

 @transient private lazy val _value: T = readBroadcastBlock()

readBroadcast会首先在本地的BlockManager寻找之前存入的被广播的对象,因此如果同一个executor中已经有task访问过_value,那么它就能直接取到已被放入本地BlockManager中的对象,

如果本地还没有, 那么就会调用readBlocks获取组成这个对象的块,然后用unblockifyObject还原这个对象,接着把它放入BlockManager,以使得同一个executor的其它task不必重复组装还原。

 private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
//从本地的blockManager里读这个被broadcast的对象,根据broadcastId
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) => //本地有
x.asInstanceOf[T] case None => //本地无
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()//如果本地没有broadcastId对应的broadcast的block,就读
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) val obj = TorrentBroadcast.unBlockifyObject[T](
blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle( //读了之后再放进BlockManager
broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
obj
}
}
}

这里有一个细节是,组装还原之后的对象被用putSingle放入BlockManager, 存储级别为MEMORY_AND_DISK,这就意味着,在MemoryStore无法容纳被广播的对象时,同一个executor的两个task可能会获取两个不同的对象(需要研究下BlockManager相关的代码才能确定)。如果这种情况发生,而被广播的对象是线程安全的,那么就是对内存的浪费。如果这种情况不发生,一个executor的所有task共享一个被广播的对象,那么可能会产生线程安全的问题。但是无论如何,使用被广播的对象时,需要以只读的方式,对它的修改可能会产生问题。

TorrentBroadcast是通过readBlocks获取构成序列化后的对象的块。

  /** Fetch torrent blocks from the driver and/or other executors. */
private def readBlocks(): Array[ByteBuffer] = {
//获取到的block被存在本地的BlockManager中并且上报给driver,这样其它的executor就可以从这个executor获取这些block了
val blocks = new Array[ByteBuffer](numBlocks)
val bm = SparkEnv.get.blockManager //需要shuffle,避免所有executor以同样的顺序下载block,使得driver依然是瓶颈
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)//组装BroadcastBlockId
logDebug(s"Reading piece $pieceId of $broadcastId")
// 先试着从本地获取,因为之前的尝试可能已经获取了一些block
def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
//如果从remote获取了block,就把它存在本地的BlockManager
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
block
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
blocks(pid) = block
}
blocks
}

readBlocks还是很简单易懂的,只是这里使用putBytes时,使用的存储级别是MEMORY_AND_DISK_SER,有些奇怪,不知道为啥对于这些bytes还需要序列化。

总结

TorrentBroadcast的实现有一些巧妙的细节,但是整体的代码还是很简洁,也比较容易理解。之所以有如此少的代码,是因为BlockManager已经提供了足够的基础设施。

04-15 17:08