首页 > 网络 > 云计算 >

Spark源码知识讲解之DiskBlockMangaer分析

2017-11-11

Spark源码知识讲解之DiskBlockMangaer分析。创建和维护逻辑上的block和磁盘上的物理位置之间的映射,默认情况下,一个block映射一个文件,文件名字一般是blockId。但是,也有可能有多个数据块映射到一个文件的一个段,block文件在配置的spark local dir目录下的目录之间根据

创建和维护逻辑上的block和磁盘上的物理位置之间的映射,默认情况下,一个block映射一个文件,文件名字一般是blockId。但是,也有可能有多个数据块映射到一个文件的一个段,block文件在配置的spark.local.dir目录下的目录之间根据hash算法存放。我们可以配置

spark.diskStore.subDirectories配置子文件目录数

一 核心属性

BlockManager blockManager: 主要进行block增删改

Int subDirsPerLocalDir: 子目录数量默认64个

Array[File] localDirs: 本地用于存放block文件的一级目录

Array[File]subDirs: 子文件

二 重要方法

2.1getFile 获取一个文件

defgetFile(filename:String): File = {
// 根据文件名进行hash
val hash= Utils.nonNegativeHash(filename)
// 根据文件名的hashcode,首先看这个文件位于哪一个一级目录
val dirId= hash % localDirs.length
// 再决定子目录位于哪一个一级目录中
val subDirId= (hash / localDirs.length) %subDirsPerLocalDir

// 如果子目录不存在则创建

val subDir= subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId),"%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create localdir in$newDir.")
}
subDirs(dirId)(subDirId) =newDir
newDir
}
}
// 根据目录和文件名创建文件
new File(subDir,filename)
}

// 使用blockId作为文件名创建文件
def getFile(blockId: BlockId): File = getFile(blockId.name)

2.2containsBlock 检查磁盘上是否存在blockId这样的block

实就是看是否磁盘上是否存在文件名为blockId的文件一个文件就对应着一个block

defcontainsBlock(blockId: BlockId): Boolean = {
 getFile(blockId.name).exists()
}

2.3getAllFiles 查询当前磁盘上所有的 文件

def getAllFiles(): Seq[File] = {
 // Get all the files inside the array of array of directories
 subDirs.flatMap { dir =>
 dir.synchronized {
 // Copy the content of dir because it may be modified in other threads
 dir.clone()
 }
 }.filter(_ != null).flatMap { dir =>
 val files = dir.listFiles()
 if (files != null) files else Seq.empty
 }
}

2.4getAllBlocks 查询出存储在磁盘上所有的block,其实就是查询磁盘上所有的block对应的文件而已

def getAllBlocks(): Seq[BlockId] = {
 getAllFiles().map(f => BlockId(f.getName))
}

2.5 createTempLocalBlock 创建临时的本地block

def createTempLocalBlock(): (TempLocalBlockId, File) = {
 var blockId = new TempLocalBlockId(UUID.randomUUID())
 while (getFile(blockId).exists()) {
 blockId = new TempLocalBlockId(UUID.randomUUID())
 }
 (blockId, getFile(blockId))
}

2.5createTempShuffleBlock 创建临时的本地shuffle block

def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
 var blockId = new TempShuffleBlockId(UUID.randomUUID())
 while (getFile(blockId).exists()) {
 blockId = new TempShuffleBlockId(UUID.randomUUID())
 }
 (blockId, getFile(blockId))
}
相关文章
最新文章
热点推荐