首页 > 网络 > 云计算 >

Spark源码讲解之BlockManagerMaster

2017-11-11

Spark源码讲解之BlockManagerMaster。主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。一 核心属性:RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端

主要负责整个应用程序在运行期间block元数据的管理和维护,以及向从节点发送指令执行命令。

一 核心属性

RpcEndpointRef: driverEndpointBlockManagerMasterEndpoint通信终端

Boolean isDriver: 是否在Driver

二 重要方法

2.1 从driver通信终端删除一个executor

defremoveExecutor(execId:String) {
// 向BlockManagerMasterEndpoint发送RemoveExecutor消息
tell(RemoveExecutor(execId))
logInfo("Removed "+ execId + " successfully in removeExecutor")
}

2.2 向BlockManagerMasterEndpoint发送RegisterBlockManager消息

def registerBlockManager(
 blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
 logInfo("Trying to register BlockManager")
 // 向BlockManagerMasterEndpoint发送RegisterBlockManager消息
 tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
 logInfo("Registered BlockManager")
}

2.3 更新数据块信息

def updateBlockInfo(
 blockManagerId: BlockManagerId,
 blockId: BlockId,
 storageLevel: StorageLevel,
 memSize: Long,
 diskSize: Long,
 externalBlockStoreSize: Long): Boolean = {
 // 向BlockManagerMasterEndpoint发送UpdateBlockInfo消息,并且返回结果
 val res = driverEndpoint.askWithRetry[Boolean](
 UpdateBlockInfo(blockManagerId, blockId, storageLevel,
 memSize, diskSize, externalBlockStoreSize))
 logDebug(s"Updated info of block $blockId")
 res
}

2.4 获取某个block的所有BlockManagerId信息

def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
 // 向BlockManagerMasterEndpoint发送GetLocations消息
 driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
}

2.5 获取多个block的位置信息

def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
 // 向BlockManagerMasterEndpoint发送GetLocationsMultipleBlockIds消息
 driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
 GetLocationsMultipleBlockIds(blockIds))
}

2.6 检查是否存在该数据块

def contains(blockId: BlockId): Boolean = {
 !getLocations(blockId).isEmpty
}

2.7 获取Executor的非当前BlockManagerId

def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
 driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
}

2.8 向BlockManagerMasterEndpoint发送RemoveBlock消息删除数据块

def removeBlock(blockId: BlockId) {
 driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
}

2.9 删除指定RDD的所有数据块

def removeRdd(rddId: Int, blocking: Boolean) {
 val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
 future.onFailure {
 case e: Exception =>
 logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
 }(ThreadUtils.sameThread)
 if (blocking) {
 timeout.awaitResult(future)
 }
}

2.10 删除指定shuffle的所有数据块

def removeShuffle(shuffleId: Int, blocking: Boolean) {
 val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
 future.onFailure {
 case e: Exception =>
 logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
 }(ThreadUtils.sameThread)
 if (blocking) {
 timeout.awaitResult(future)
 }
}
2.11 返回每一个block manager的内存状态
def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
 driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
}

2.12 返回存储状态

def getStorageStatus: Array[StorageStatus] = {
 driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
}

2.13 获取block状态

def getBlockStatus(
 blockId: BlockId,
 askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
 val msg = GetBlockStatus(blockId, askSlaves)
 val response = driverEndpoint.
 askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
 val (blockManagerIds, futures) = response.unzip
 implicit val sameThread = ThreadUtils.sameThread
 val cbf =
 implicitly[
 CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
 Option[BlockStatus],
 Iterable[Option[BlockStatus]]]]
 val blockStatus = timeout.awaitResult(
 Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
 if (blockStatus == null) {
 throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
 }
 blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
 status.map { s => (blockManagerId, s) }
 }.toMap
}

2.14 找到那些executor有缓存的block

def hasCachedBlocks(executorId: String): Boolean = {
 driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
}
相关文章
最新文章
热点推荐