首页 > 网络 > 云计算 >

Spark源码知识讲解之cahce原理分析

2017-11-11

Spark源码知识讲解之cahce原理分析。Task运行的时候是要去获取Parent 的RDD对应的Partition的数据的,即它会调用RDD的iterator方法把对应的Partition的数据集给遍历出来,然后写入存储,这个存储可能是磁盘或者内存,取决于StorageLevel是什么。

Task运行的时候是要去获取Parent 的RDD对应的Partition的数据的,即它会调用RDD的iterator方法把对应的Partition的数据集给遍历出来,然后写入存储,这个存储可能是磁盘或者内存,取决于StorageLevel是什么。

如果当前RDD的StorageLevel不为空,则表示已经存持久化了,我们可以直接在内存中获取,而不是去计算Parent RDD。如果没有StorageLevel,则表示没有缓存过,内存中没有,则我们需要运行的数据就需要从Parent RDD计算出来。注意,这里所谓的缓存并不是使用什么cache 组件,而直接是从本地读取,本地没有则从远端,获取的结果直接放入内存存储,方便后续读取,这才是真正cache的地方。

一 RDD的iterator方法

final defiterator(split:Partition, context: TaskContext): Iterator[T] = {

// 如果StorageLevel不为空,表示该RDD已经持久化过了,可能是在内存,也有可能是在磁盘,

// 如果是磁盘获取的,需要把block缓存在内存中

if (storageLevel!= StorageLevel.NONE) {

getOrCompute(split,context)

} else {

// 进行rdd partition的计算或者根据checkpoint读取数据

computeOrReadCheckpoint(split,context)

}

}

二 RDD的getOrCompute

从内存或者磁盘获取,如果磁盘获取需要将block缓存到内存

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
 // 根据rdd id创建RDDBlockId
 val blockId = RDDBlockId(id, partition.index)
 // 是否从缓存的block读取
 var readCachedBlock = true
 SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
 readCachedBlock = false // 如果调用了这个函数,说明没有获取到block,自然不能从cache中读取
 // 需要调用该函数重新计算或者从checkpoint读取
 computeOrReadCheckpoint(partition, context)
 }) match {
 // 获取到了结果直接返回
 case Left(blockResult) =>
 // 如果从cache读取block
 if (readCachedBlock) {
 val existingMetrics = context.taskMetrics().inputMetrics
 existingMetrics.incBytesRead(blockResult.bytes)
 new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
 override def next(): T = {
 existingMetrics.incRecordsRead(1)
 delegate.next()
 }
 }
 } else {
 new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
 }
 case Right(iter) =>
 new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
 }
}

三 BlockManager的getOrElseUpdate方法

如果指定的block存在,则直接获取,否则调用makeIterator方法去计算block,然后持久化最后返回值

def getOrElseUpdate[T](
 blockId: BlockId,
 level: StorageLevel,
 classTag: ClassTag[T],
 makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
 // 尝试从本地获取数据,如果获取不到则从远端获取
 get[T](blockId)(classTag) match {
 case Some(block) =>
 return Left(block)
 case _ =>
 // Need to compute the block.
 }
 // 如果本地化和远端都没有获取到数据,则调用makeIterator计算,最后将结果写入block
 doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
 // 表示写入成功
 case None =>
 // 从本地获取数据块
 val blockResult = getLocalValues(blockId).getOrElse {
 releaseLock(blockId)
 throw new SparkException(s"get() failed for block $blockId even though we held a lock")
 }
 releaseLock(blockId)
 Left(blockResult)
 // 如果写入失败
 case Some(iter) =>
 // 如果put操作失败,表示可能是因为数据太大,无法写入内存,又无法被磁盘drop,因此我们需要返回这个iterator给
 // 调用者以至于他们能够做出决定这个值是什么,怎么办
 Right(iter)
 }
}

四 BlockManager的get方法

先从本地获取数据,如果没有则从远端获取

def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
 // 从本地获取block
 val local = getLocalValues(blockId)
 // 如果本地获取到了则返回
 if (local.isDefined) {
 logInfo(s"Found block $blockId locally")
 return local
 }
 // 如果本地没有获取到则从远端获取
 val remote = getRemoteValues[T](blockId)
 // 如果远端获取到了则返回,没有返回None
 if (remote.isDefined) {
 logInfo(s"Found block $blockId remotely")
 return remote
 }
 None
}

五 BlockManager的getLocalValues方法

从本地获取block,如果存在返回BlockResult,不存在返回None;如果storage level是磁盘,则还需将得到的block缓存到内存存储,方便下次读取

def getLocalValues(blockId: BlockId): Option[BlockResult] = {
 logDebug(s"Getting local block $blockId")
 // 调用block info manager,锁定该block,然后读取block,返回该block 元数据block info
 blockInfoManager.lockForReading(blockId) match {
 // 没有读取到则返回None
 case None =>
 logDebug(s"Block $blockId was not found")
 None
 // 读取到block元数据
 case Some(info) =>
 // 获取存储级别storage level
 val level = info.level
 logDebug(s"Level for block $blockId is $level")
 // 如果使用内存,且内存memory store包含这个block id
 if (level.useMemory && memoryStore.contains(blockId)) {
 // 判断是不是storage level是不是反序列化的,如果死反序列化的,则调用MemoryStore的getValues方法
 // 否则调用MemoryStore的getBytes然后反序列输入流返回数据作为迭代器
 val iter: Iterator[Any] = if (level.deserialized) {
 memoryStore.getValues(blockId).get
 } else {
 serializerManager.dataDeserializeStream(
 blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
 }
 val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
 // 构建一个BlockResult对象返回,这个对象包括数据,读取方式以及字节大小
 Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
 }
 // 如果使用磁盘存储,且disk store包含这个block则从磁盘获取,并且把结果放入内存
 else if (level.useDisk && diskStore.contains(blockId)) {
 // 调用DiskStore的getBytes方法,如果需要反序列化,则进行反序列
 val iterToReturn: Iterator[Any] = {
 val diskBytes = diskStore.getBytes(blockId)
 if (level.deserialized) {
 val diskValues = serializerManager.dataDeserializeStream(
 blockId,
 diskBytes.toInputStream(dispose = true))(info.classTag)
 // 尝试将从磁盘读的溢写的值加载到内存,方便后续快速读取
 maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
 } else {
 // 如果不需要反序列化,首先将读取到的流加载到内存,方便后续快速读取
 val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
 .map {_.toInputStream(dispose = false)}
 .getOrElse { diskBytes.toInputStream(dispose = true) }
 // 然后再返回反序列化之后的数据
 serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
 }
 }
 // 构建BlockResult返回
 val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
 Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
 } else {// 处理本地读取block失败,报告driver这是一个无效的block,将会删除这个block
 handleLocalReadFailure(blockId)
 }
 }
}

五 BlockManager的getRemoteValues方法

从block所存放的其他block manager(其他节点)获取block

private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
 val ct = implicitly[ClassTag[T]]
 // 将远程fetch的结果进行反序列化,然后构建BlockResult返回
 getRemoteBytes(blockId).map { data =>
 val values =
 serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
 new BlockResult(values, DataReadMethod.Network, data.size)
 }
}
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
 logDebug(s"Getting remote block $blockId")
 require(blockId != null, "BlockId is null")
 var runningFailureCount = 0
 var totalFailureCount = 0

 // 首先根据blockId获取当前block存在在哪些block manager上
 val locations = getLocations(blockId)
 // 最大允许的获取block的失败次数为该block对应的block manager数量
 val maxFetchFailures = locations.size
 var locationIterator = locations.iterator
 // 开始遍历block manager
 while (locationIterator.hasNext) {
 val loc = locationIterator.next()
 logDebug(s"Getting remote block $blockId from $loc")
 // 通过调用BlockTransferSerivce的fetchBlockSync方法从远端获取block
 val data = try {
 blockTransferService.fetchBlockSync(
 loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
 } catch {
 case NonFatal(e) =>
 runningFailureCount += 1
 totalFailureCount += 1
 // 如果总的失败数量大于了阀值则返回None
 if (totalFailureCount >= maxFetchFailures) {
 logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
 s"Most recent failure cause:", e)
 return None
 }

 logWarning(s"Failed to fetch remote block $blockId " +
 s"from $loc (failed attempt $runningFailureCount)", e)

 if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
 locationIterator = getLocations(blockId).iterator
 logDebug(s"Refreshed locations from the driver " +
 s"after ${runningFailureCount} fetch failures.")
 runningFailureCount = 0
 }

 // This location failed, so we retry fetch from a different one by returning null here
 null
 }
 // 返回ChunkedByteBuffer
 if (data != null) {
 return Some(new ChunkedByteBuffer(data))
 }
 logDebug(s"The value of block $blockId is null")
 }
 logDebug(s"Block $blockId not found")
 None
}

六RDD的computeOrReadCheckpoint

如果block没有被持久化,即storage level为None,我们就需要进行计算或者从Checkpoint读取数据;如果已经checkpoint了,则调用ietrator去读取block数据,否则调用Parent的RDD的compute方法

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
 // 当前rdd是否已经checkpoint和物化了,如果已经checkpoint,则调用第一个parent rdd的iterator方法获取
 // 如果没有则开始计算
 if (isCheckpointedAndMaterialized) {
 firstParent[T].iterator(split, context)
 } else {
 // 则调用rdd的compute方法开始计算,返回一个Iterator对象
 compute(split, context)
 }
}
相关文章
最新文章
热点推荐