首页 > 网络 > 云计算 >

Spark源码讲解之BlockManager

2017-11-11

Spark源码讲解之BlockManager。BlockManager是对外提供的统一访问block的接口,在Master和Slave上都有一个实例,他提供读写数据的方法,并且根据不同StorageLevel调用不同的BlockStore来读写数据。

BlockManager是对外提供的统一访问block的接口,在Master和Slave上都有一个实例,他提供读写数据的方法,并且根据不同StorageLevel调用不同的BlockStore来读写数据。

在应用程序启动的时候,SparkContext会创建Driver端的SpakEnv,在该SparkEnv中实例化BlockManager和BlockManagerMaster,在其内部创建消息通信的终端BlockManagerMasterEndpoint.

在Executor启动的时候,也会创建其SparkEnv,在该SparkEnv中实例化BlockManager和BlockTransferService. 在BlockManager初始化的过程一方面会加入BlockManagerSlaveEndpoint的消息终端,并把该BlockManagerSlaveEndpoint的该终端引用注册到Driver中,这样Driver和Executor就可以相互持有通信终端的引用

// 创建BlockManagerMaster
val blockManagerMaster= new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
// 创建BlockManagerMasterEndpoint
new BlockManagerMasterEndpoint(rpcEnv,isLocal, conf,listenerBus)),
conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
// 创建BlockManager

val blockManager= new BlockManager(executorId,rpcEnv, blockManagerMaster,
serializer, conf,mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)

一 核心属性

String executorId: executorId或者是driverId

RpcEnv rpcEnv: rpc通信环境

BlockManagerMaster master: 主要负责整个应用程序在运行期间block元数据的管理和维护,和指令的发送

Serializer defaultSerializer: 默认的序列化机制

Long maxMemory:分配的最大可用内存

Int numUsableCores: 可以使用cpu核数

MapOutputTracker mapOutputTracker:map端shuffle过程的输出状态

ShuffleManager shuffleManager: Shuffle管理器

BlockTransferService blockTransferService: 用于远程间传输数据,用于获取上床block

DiskBlockManager diskBlockManager:主要用于管理磁盘上block以及对应文件和目录

TimeStampedHashMap[BlockId, BlockInfo] blockInfo: 构建一个维护的映射

Boolean externalBlockStoreInitialized: 是否使用外部存储

MemoryStore memoryStore:内存存储对象

DiskStore diskStore:磁盘存储对象

ExternalBlockStore externalBlockStore:外部寸处对象

BlockManagerId blockManagerId:当前BlockManager对应的id

ShuffleClient shuffleClient:读取其他executor上的shuffle 文件的客户端,可能是外部服务也有可能是标准的数据块传输服务,如果启用了如果启用外部shuffle 服务,则创建ExternalShuffleClient否则创建 BlockTransferService

Boolean compressBroadcast:是否对广播数据进行压缩

Boolean compressShuffle:是否压缩map输出文件,一般建议打开,但是如果cpu资源消耗太大,则不建议设置为true

Boolean compressRdds:是否要压缩序列化的RDD分区

Boolean compressShuffleSpill:是否对map端溢写的临时文件进行压缩

BlockManagerSlaveEndpoint slaveEndpoint: 持有的BlockManagerSlaveEndpoint通信终端

二 重要方法

2.1initialize 初始化方法,用指定的application id 实例化BlockManager

# 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据

# 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端

# 构建blockManagerId

# 构建shuffleServerId

# 向BlockManagerMaster注册BlockManager

# 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务

def initialize(appId: String): Unit = {
 // 初始化BlockTransferService,构建rpc server等,BlockTransferService主要是用于跨节点传输数据
 blockTransferService.init(this)
 // 初始化ShuffleClient,读取其他executor上的shuffle文件的客户端
 shuffleClient.init(appId)
 // 构建blockManagerId
 blockManagerId = BlockManagerId(
 executorId, blockTransferService.hostName, blockTransferService.port)
 // 构建shuffleServerId
 shuffleServerId = if (externalShuffleServiceEnabled) {
 BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
 } else {
 blockManagerId
 }
 // 向BlockManagerMaster注册BlockManager,传入了slaveEndpoint,用于和BlockManagerMaster通信
 master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

 // 如果外部Shuffle服务启动并且为Executor节点,则注册外部Shuffle服务
 if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
 registerWithExternalShuffleServer()
 }
}

2.2reportAllBlocks 报告BlockManager所有的数据块

private def reportAllBlocks(): Unit = {
 logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
 for ((blockId, info) <- blockInfo) {
 // 获取每一个block当前的状态
 val status = getCurrentBlockStatus(blockId, info)
 if (!tryToReportBlockStatus(blockId, info, status)) {
 logError(s"Failed to report $blockId to master; giving up.")
 return
 }
 }
}
private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = {
 info.synchronized {
 info.level match {
 case null =>
 BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
 case level =>
 val inMem = level.useMemory && memoryStore.contains(blockId)
 val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
 val onDisk = level.useDisk && diskStore.contains(blockId)
 val deserialized = if (inMem) level.deserialized else false
 val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
 val storageLevel =
 StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
 val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
 val externalBlockStoreSize =
 if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
 val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
 BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
 }
 }
}

2.3reregister 注册BlockManager,并且报告BlockManager所有数据块的状态

def reregister(): Unit = {
 // TODO: We might need to rate limit re-registering.
 logInfo("BlockManager re-registering with master")
 master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
 reportAllBlocks()
}

2.4getBlockData 获取本地数据块数据

# 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块,否则调用doGetLocal从本地获取

# 结果封装成buffer,然后创建NioManagedBuffer,然后返回

override def getBlockData(blockId: BlockId): ManagedBuffer = {
 // 首先判断是不是shuffle的数据块
 if (blockId.isShuffle) {
 // 如果是shuffle的数据块,则通过ShuffleBlockResolver获取数据块
 shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
 } else {
 // 获取本地block数据
 val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
 .asInstanceOf[Option[ByteBuffer]]
 if (blockBytesOpt.isDefined) {
 // 获取buffer,然后创建NioManagedBuffer
 val buffer = blockBytesOpt.get
 new NioManagedBuffer(buffer)
 } else {
 throw new BlockNotFoundException(blockId.toString)
 }
 }
}

# doGetLocal:根据指定的blockId获取本地block数据,如果是存在内存上的直接从内存获取;如果是存储在磁盘上的则从磁盘获取,如果是MEMORY_AND_DISK,先放入内存,再返回数据,下次就可以从内存获取,否则直接返回等

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
 // 根据blockId获取blockInfo
 val info = blockInfo.get(blockId).orNull
 // 如果blockInfo不为空
 if (info != null) {
 info.synchronized {
 // 再次检查blockInfo是否为空
 if (blockInfo.get(blockId).isEmpty) {
 logWarning(s"Block $blockId had been removed")
 return None
 }

 // 检查该block是否其他线程正在写,等待该block变为ready状态
 if (!info.waitForReady()) {
 // If we get here, the block write failed.
 logWarning(s"Block $blockId was marked as failure.")
 return None
 }
 // 获取该Block的存储级别
 val level = info.level
 logDebug(s"Level for block $blockId is $level")

 // 如果存储级别是内存,则查找MemoryStore返回数据
 if (level.useMemory) {
 logDebug(s"Getting block $blockId from memory")
 val result = if (asBlockResult) {
 memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
 } else {
 memoryStore.getBytes(blockId)
 }
 result match {
 case Some(values) =>
 return result
 case None =>
 logDebug(s"Block $blockId not found in memory")
 }
 }

 // 如果使用堆外存储,则查找ExternalBlockStore返回数据
 if (level.useOffHeap) {
 logDebug(s"Getting block $blockId from ExternalBlockStore")
 if (externalBlockStore.contains(blockId)) {
 val result = if (asBlockResult) {
 externalBlockStore.getValues(blockId)
 .map(new BlockResult(_, DataReadMethod.Memory, info.size))
 } else {
 externalBlockStore.getBytes(blockId)
 }
 result match {
 case Some(values) =>
 return result
 case None =>
 logDebug(s"Block $blockId not found in ExternalBlockStore")
 }
 }
 }

 // 如果使用磁盘存储,则查找DiskStore返回数据
 if (level.useDisk) {
 logDebug(s"Getting block $blockId from disk")
 val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
 case Some(b) => b
 case None =>
 throw new BlockException(
 blockId, s"Block $blockId not found on disk, though it should be")
 }
 assert(0 == bytes.position())
 // 如果不能使用内存存储,则直接返回结果
 if (!level.useMemory) {
 // If the block shouldn&#39;t be stored in memory, we can just return it
 if (asBlockResult) {
 return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
 info.size))
 } else {
 return Some(bytes)
 }
 } else {
 // 若可以存入内存,将查询出来的数据放入内存,这样下次再查找该block数据直接从内存获取,以提高速度
 if (!level.deserialized || !asBlockResult) {
 memoryStore.putBytes(blockId, bytes.limit, () => {
 val copyForMemory = ByteBuffer.allocate(bytes.limit)
 // 如果内存放不下该block数据,将会发生OOM,因此放入ByteBuffer并且懒加载
 copyForMemory.put(bytes)
 })
 bytes.rewind()
 }
 if (!asBlockResult) {
 return Some(bytes)
 } else {// 如果想返回BlockResult对象
 // 将字节数据数据反序列化
 val values = dataDeserialize(blockId, bytes)
 // 如果存储级别是反序列化
 if (level.deserialized) {
 // 再返回之前放入内存缓存
 val putResult = memoryStore.putIterator(
 blockId, values, level, returnValues = true, allowPersistToDisk = false)
 putResult.data match {
 case Left(it) =>
 return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
 case _ =>
 // This only happens if we dropped the values back to disk (which is never)
 throw new SparkException("Memory store did not return an iterator!")
 }
 } else {
 return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
 }
 }
 }
 }
 }
 } else {
 logDebug(s"Block $blockId not registered locally")
 }
 None
}

2.5getLocal 从本地BlockManager获取数据

def getLocal(blockId: BlockId): Option[BlockResult] = {
 logDebug(s"Getting local block $blockId")
 doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
}

2.6getLocalgetLocalBytes从本地BlockManager获取数据并且序列化结果

def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
 logDebug(s"Getting local block $blockId as bytes")

 if (blockId.isShuffle) {
 val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
 // TODO: This should gracefully handle case where local block is not available. Currently
 // downstream code will throw an exception.
 Option(
 shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
 } else {
 doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
 }
}

2.7doGetRemote 从远端获取数据

private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
 require(blockId != null, "BlockId is null")
 // 从blockId对应的BlockManagerId,然后打乱顺序
 val locations = Random.shuffle(master.getLocations(blockId))
 // 遍历每一个BlockManagerId,调用blockTransferService的fetchBlockSync去拉取数据,返回
 for (loc <- locations) {
 logDebug(s"Getting remote block $blockId from $loc")
 val data = blockTransferService.fetchBlockSync(
 loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

 if (data != null) {
 if (asBlockResult) {
 return Some(new BlockResult(
 dataDeserialize(blockId, data),
 DataReadMethod.Network,
 data.limit()))
 } else {
 return Some(data)
 }
 }
 logDebug(s"The value of block $blockId is null")
 }
 logDebug(s"Block $blockId not found")
 None
}

2.8doPut根据StorageLevel存放数据

private def doPut(blockId: BlockId, data: BlockValues,
 level: StorageLevel, tellMaster: Boolean = true,
 effectiveStorageLevel: Option[StorageLevel] = None)
 : Seq[(BlockId, BlockStatus)] = {
 // 检查BlockId,StorageLevel是否为空
 require(blockId != null, "BlockId is null")
 require(level != null && level.isValid, "StorageLevel is null or invalid")
 effectiveStorageLevel.foreach { level =>
 require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
 }

 // 新建数组,存放blockId和block状态
 val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

 // 获取BlockInfo
 val putBlockInfo = {
 // 创建BlockInfo对象
 val tinfo = new BlockInfo(level, tellMaster)
 // 将该blockId和创建BlockInfo对象放入内存,并且返回更新key之对应的value之前的value,即
 // 如果存在该key对应的value,则返回,如果没有则直接存入,返回为空,即不能进行更新
 val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
 // 如果已经有旧的BlockInfo,则判断是否正准备写,如果准备些则直接返回更新的(BlockId, BlockStatus)数组
 if (oldBlockOpt.isDefined) {
 if (oldBlockOpt.get.waitForReady()) {
 logWarning(s"Block $blockId already exists on this machine; not re-adding it")
 return updatedBlocks
 }
 oldBlockOpt.get
 } else {
 tinfo // 否则直接返回blockInfo
 }
 }

 val startTimeMs = System.currentTimeMillis
 var valuesAfterPut: Iterator[Any] = null
 var bytesAfterPut: ByteBuffer = null
 // 数据块大小
 var size = 0L
 // 存储的级别
 val putLevel = effectiveStorageLevel.getOrElse(level)
 // 启动一个线程在本地存储之前,异步初始化好要进行的备份的数据,这有助于提高发送数据的速度
 val replicationFuture = data match {
 case b: ByteBufferValues if putLevel.replication > 1 =>
 // 复制出一个新的buffer
 val bufferView = b.buffer.duplicate()
 Future {
 // 拷贝数据到其他节点
 replicate(blockId, bufferView, putLevel)
 }(futureExecutionContext)
 case _ => null
 }
 // 防止其他线程put这个block,所以需要使用同步操作指导marked置为true
 putBlockInfo.synchronized {
 logTrace("Put for block %s took %s to get into synchronized block"
 .format(blockId, Utils.getUsedTimeMs(startTimeMs)))

 var marked = false
 try {
 // returnValues:是否返回put操作的值
 // blockStore:存储方式
 val (returnValues, blockStore: BlockStore) = {
 // 内存存储返回true和MemoryStore
 if (putLevel.useMemory) {
 (true, memoryStore)
 } else if (putLevel.useOffHeap) {
 //对外存储返回false和ExternalBlockStore
 (false, externalBlockStore)
 } else if (putLevel.useDisk) {
 // 磁盘存储,如果复制因子大于1返回true和DiskStore,否则返回false和DiskStore
 (putLevel.replication > 1, diskStore)
 } else {
 assert(putLevel == StorageLevel.NONE)
 throw new BlockException(
 blockId, s"Attempted to put block $blockId without specifying storage level!")
 }
 }

 // 匹配put操作存入的值类型,调用blockStore方法
 val result = data match {
 // 可迭代值类型,调用BlockStore的putIterator方法
 case IteratorValues(iterator) =>
 blockStore.putIterator(blockId, iterator, putLevel, returnValues)
 // 数组类型,调用BlockStore的putArray方法
 case ArrayValues(array) =>
 blockStore.putArray(blockId, array, putLevel, returnValues)
 // ByteBufferValues类型,调用BlockStore的putBytes方法
 case ByteBufferValues(bytes) =>
 bytes.rewind()
 blockStore.putBytes(blockId, bytes, putLevel)
 }
 // 获取结果大小
 size = result.size
 result.data match {
 case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
 case Right (newBytes) => bytesAfterPut = newBytes
 case _ =>
 }

 // 如果使用内存存储,遍历result结果中的droppedBlocks,将溢出到磁盘的block添加到updateBlock集合中
 if (putLevel.useMemory) {
 result.droppedBlocks.foreach { updatedBlocks += _ }
 }
 // 获取当前block状态
 val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
 if (putBlockStatus.storageLevel != StorageLevel.NONE) {
 // 将marked标记置为true
 marked = true
 // 调用blockInfo的markReady标记该block写操作完成
 putBlockInfo.markReady(size)
 // 向Master汇报block状态
 if (tellMaster) {
 reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
 }
 // 更新updatedBlocks
 updatedBlocks += ((blockId, putBlockStatus))
 }
 } finally {
 // If we failed in putting the block to memory/disk, notify other possible readers
 // that it has failed, and then remove it from the block info map.
 if (!marked) {
 // Note that the remove must happen before markFailure otherwise another thread
 // could&#39;ve inserted a new BlockInfo before we remove it.
 blockInfo.remove(blockId)
 putBlockInfo.markFailure()
 logWarning(s"Putting block $blockId failed")
 }
 }
 }
 logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))

 // 如果复制因子大于1,开始异步复制数据
 if (putLevel.replication > 1) {
 data match {
 case ByteBufferValues(bytes) =>
 if (replicationFuture != null) {
 Await.ready(replicationFuture, Duration.Inf)
 }
 case _ =>
 val remoteStartTime = System.currentTimeMillis
 // Serialize the block if not already done
 if (bytesAfterPut == null) {
 if (valuesAfterPut == null) {
 throw new SparkException(
 "Underlying put returned neither an Iterator nor bytes! This shouldn&#39;t happen.")
 }
 bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
 }
 replicate(blockId, bytesAfterPut, putLevel)
 logDebug("Put block %s remotely took %s"
 .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
 }
 }
 // 如果是基于内存映射的,则开始清理ByteBuffer
 BlockManager.dispose(bytesAfterPut)

 if (putLevel.replication > 1) {
 logDebug("Putting block %s with replication took %s"
 .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 } else {
 logDebug("Putting block %s without replication took %s"
 .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 }

 updatedBlocks
}

2.9getPeers 获取集群中非当前BlockManagerId和非Driver端的BlockManagerId的所有BlockManagerId

private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
 peerFetchLock.synchronized {
 val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
 val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
 if (cachedPeers == null || forceFetch || timeout) {
 cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
 lastPeerFetchTime = System.currentTimeMillis
 logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
 }
 cachedPeers
 }
}

2.10replicate 复制数据块到其他节点

private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
 // 所允许最大复制失败次数
 val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
 // 需要复制到其他的BlockManagerId数量
 val numPeersToReplicateTo = level.replication - 1
 // 需要被复制的BlockManagerId数组
 val peersForReplication = new ArrayBuffer[BlockManagerId]
 // 需要复制到其他的BlockManagerId数组
 val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
 // 复制失败的BlockManagerId数组
 val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
 // 创建存储级别
 val tLevel = StorageLevel(
 level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
 val startTime = System.currentTimeMillis
 val random = new Random(blockId.hashCode)

 var replicationFailed = false
 var failures = 0
 var done = false

 // 获取Executor非当前BlockManagerId集合
 peersForReplication ++= getPeers(forceFetch = false)
 // 获取一个随机的BlockManagerId
 def getRandomPeer(): Option[BlockManagerId] = {
 // 如果复制失败
 if (replicationFailed) {
 // 清理数组
 peersForReplication.clear()
 // 把获取Executor非当前BlockManagerId集合放入该集合
 peersForReplication ++= getPeers(forceFetch = true)
 // 移除掉需要复制到那个BlockManagerId
 peersForReplication --= peersReplicatedTo
 // 移除掉失败复制到那个BlockManagerId
 peersForReplication --= peersFailedToReplicateTo
 }
 // 如果需要被复制的BlockManagerId集合部位空,则则随机取出一个BlockManagerId
 if (!peersForReplication.isEmpty) {
 Some(peersForReplication(random.nextInt(peersForReplication.size)))
 } else {
 None
 }
 }
 // 如果复制还没完成,则不断地循环
 while (!done) {
 // 获取随机的一个BlockManagerId
 getRandomPeer() match {
 case Some(peer) =>
 try {
 val onePeerStartTime = System.currentTimeMillis
 data.rewind()
 logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
 // 调用BlockTransferService的uploadBlockSync方法,同步上床block
 blockTransferService.uploadBlockSync(
 peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
 logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
 .format(System.currentTimeMillis - onePeerStartTime))
 // 更新复制到的BlockManagerId的集合
 peersReplicatedTo += peer
 // 被复制到的BlockManagerId的集合移除掉这个BlockManagerId,避免下一次复制到一个BlockManager
 peersForReplication -= peer
 replicationFailed = false
 // 如果已经复制的数量等于需要需要复制的数量,则表示复制完成
 if (peersReplicatedTo.size == numPeersToReplicateTo) {
 done = true
 }
 } catch {
 case e: Exception =>
 logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
 failures += 1
 replicationFailed = true
 peersFailedToReplicateTo += peer
 if (failures > maxReplicationFailures) { // too many failures in replcating to peers
 done = true
 }
 }
 case None => // no peer left to replicate to
 done = true
 }
 }
 val timeTakeMs = (System.currentTimeMillis - startTime)
 logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
 s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
 if (peersReplicatedTo.size < numPeersToReplicateTo) {
 logWarning(s"Block $blockId replicated to only " +
 s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
 }
}

2.11dropFromMemory 从内存中放弃某一个block,可能内存已经满了,放在磁盘比较合适等

def dropFromMemory(
 blockId: BlockId,
 data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

 logInfo(s"Dropping block $blockId from memory")
 // 获取BlockInfo
 val info = blockInfo.get(blockId).orNull

 // If the block has not already been dropped
 if (info != null) {
 info.synchronized {
 if (!info.waitForReady()) {
 logWarning(s"Block $blockId was marked as failure. Nothing to drop")
 return None
 } else if (blockInfo.get(blockId).isEmpty) {
 logWarning(s"Block $blockId was already dropped.")
 return None
 }
 var blockIsUpdated = false
 val level = info.level

 // 存储级别是磁盘且磁盘不包括这个block,则存入磁盘
 if (level.useDisk && !diskStore.contains(blockId)) {
 logInfo(s"Writing block $blockId to disk")
 data() match {
 case Left(elements) =>
 diskStore.putArray(blockId, elements, level, returnValues = false)
 case Right(bytes) =>
 diskStore.putBytes(blockId, bytes, level)
 }
 blockIsUpdated = true
 }

 // 如果内存包含这个blockId,则获取这个blocl大小
 val droppedMemorySize =
 if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
 // MemoryStore移除这个block
 val blockIsRemoved = memoryStore.remove(blockId)
 if (blockIsRemoved) {
 blockIsUpdated = true
 } else {
 logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
 }
 // 重新获取当前block状态
 val status = getCurrentBlockStatus(blockId, info)
 if (info.tellMaster) {
 // 向master报告数据块状态
 reportBlockStatus(blockId, info, status, droppedMemorySize)
 }
 // 如果存储级别不是磁盘,则移除这个blockId
 if (!level.useDisk) {
 // The block is completely gone from this node; forget it so we can put() it again later.
 blockInfo.remove(blockId)
 }
 if (blockIsUpdated) {
 return Some(status)
 }
 }
 }
 None
}

2.12removeRdd 删除所有数据当前RDD的数据块

def removeRdd(rddId: Int): Int = {
 // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
 logInfo(s"Removing RDD $rddId")
 val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
 blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
 blocksToRemove.size
}

2.13removeBlock 从内存和磁盘中移除block

def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
 logDebug(s"Removing block $blockId")
 // 根据blockId获取blockInfo
 val info = blockInfo.get(blockId).orNull
 if (info != null) {
 info.synchronized {
 // 内存移除这个block
 val removedFromMemory = memoryStore.remove(blockId)
 // 磁盘移除这个block
 val removedFromDisk = diskStore.remove(blockId)
 // ExternalBlockStore移除这个block
 val removedFromExternalBlockStore =
 if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
 if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
 logWarning(s"Block $blockId could not be removed as it was not found in either " +
 "the disk, memory, or external block store")
 }
 // 开始移除这个blockId
 blockInfo.remove(blockId)
 // 如果需要,则向master报告数据块状态
 if (tellMaster && info.tellMaster) {
 val status = getCurrentBlockStatus(blockId, info)
 reportBlockStatus(blockId, info, status)
 }
 }
 } else {
 // The block has already been removed; do nothing.
 logWarning(s"Asked to remove block $blockId, which does not exist")
 }
}
相关文章
最新文章
热点推荐