首页 > 网络 > 云计算 >

Spark源码知识讲解之BlockStore

2017-11-11

Spark源码知识讲解之BlockStore。BlockStore是存储block抽象类,子类包括DiskStore,MemoryStore以及ExternalBlockStore等。一 DiskStore 磁盘存储:存储数据块(block)到磁盘,我我们可以在DiskStore中配置多个存放block的目录,DiskBlockManager会根据 这些配置创建不同的

BlockStore是存储block抽象类,子类包括DiskStore,MemoryStore以及ExternalBlockStore等

一 DiskStore 磁盘存储

存储数据块(block)到磁盘,我我们可以在DiskStore中配置多个存放block的目录,DiskBlockManager会根据 这些配置创建不同的文件夹,存放block

二 MemoryStore 内存存储

# getSize 获取指定blockId对应的block文件大小

defgetSize(blockId:BlockId): Long = {

diskManager.getFile(blockId.name).length

}

# put 放数据

def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
 // 判断指定的blockId对应的block的文件是否存在
 if (contains(blockId)) {
 throw new IllegalStateException(s"Block $blockId is already present in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 // 获取block所在文件
 val file = diskManager.getFile(blockId)
 // 构建文件输出流
 val fileOutputStream = new FileOutputStream(file)
 var threwException: Boolean = true
 try {
 // 将数据写入blockId指定的文件
 writeFunc(fileOutputStream)
 threwException = false
 } finally {
 try {
 Closeables.close(fileOutputStream, threwException)
 } finally {
 if (threwException) {
 remove(blockId)
 }
 }
 }
 val finishTime = System.currentTimeMillis
 logDebug("Block %s stored as %s file on disk in %d ms".format(
 file.getName,
 Utils.bytesToString(file.length()),
 finishTime - startTime))
}

# putBytes 据指定的byte数据,将其写入block文件

def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
 put(blockId) { fileOutputStream =>
 val channel = fileOutputStream.getChannel
 Utils.tryWithSafeFinally {
 bytes.writeFully(channel)
 } {
 channel.close()
 }
 }
}

二 MemoryStore

2.1 核心属性

BlockInfoManager blockInfoManager:跟踪单个数据块的元数据

SerializerManager serializerManager:序列化管理器

MemoryManager memoryManager:内存管理器

BlockEvictionHandler blockEvictionHandler:回收block的处理器

LinkedHashMap[BlockId, MemoryEntry[_]] entries:存放在内存的block数据

HashMap[Long, Long] onHeapUnrollMemoryMap:一个的映射

HashMap[Long, Long] offHeapUnrollMemoryMap:开的block已经使用的内存(对外存储)

Long unrollMemoryThreshold:展开block之前初始化的内存阀值

2.2 重要的类和方法

# Long maxMemory :最大的内存大小

private def maxMemory: Long = {
 memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
}

# memoryUsed 已经使用的内存,包括正展开的block使用的内存

private def memoryUsed: Long = memoryManager.storageMemoryUsed

# blocksMemoryUsed 经写完block占用的内存,不包括正在展开的block

private def blocksMemoryUsed: Long = memoryManager.synchronized {
 memoryUsed - currentUnrollMemory
}

# putBytes 往内存添加数据

往内存添加数据,如果内存足够,则创建ByteBuffer,然后放进MemoryStore,否则不会创建ByteBuffer

def putBytes[T: ClassTag](blockId: BlockId,
 size: Long, memoryMode: MemoryMode,
 _bytes: () => ChunkedByteBuffer): Boolean = {
 require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
 // 通过MemoryManager申请storage内存
 if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
 // 如果为这个block申请到了足够的内存
 val bytes = _bytes()
 assert(bytes.size == size)
 // 创建一个SerializedMemoryEntry,然后放入内存
 val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
 entries.synchronized {
 entries.put(blockId, entry)
 }
 logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
 blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 true
 } else {
 false
 }
}

# putIteratorAsValues 尝试将一个迭代器放到block

private[storage] def putIteratorAsValues[T](
 blockId: BlockId,
 values: Iterator[T],
 classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

 require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

 // 目前为止已经展开了多少元素
 var elementsUnrolled = 0
 // 是否这里还有足够的内存继续保持这个block展开
 var keepUnrolling = true
 // Initial per-task memory to request for unrolling blocks (bytes).
 // 对于展开的数据块,初始化配一个任务内存
 val initialMemoryThreshold = unrollMemoryThreshold
 // 多久检查一次我们是否需要请求更多的内存
 val memoryCheckPeriod = 16
 // 这个task预留的内存
 var memoryThreshold = initialMemoryThreshold
 // 内存请求增长因子
 val memoryGrowthFactor = 1.5
 // 跟踪特殊block或者putIterator操作展开的内存
 var unrollMemoryUsedByThisBlock = 0L
 // 对于展开的block构建一个vector,只用于添加,然后预计其大小
 var vector = new SizeTrackingVector[T]()(classTag)

 // 为任务预定内存用于展开指定的block,因为展开block也需要消费内存
 keepUnrolling =
 reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
 // 如果没有预定的内存为展开指定的block,给出警告信息
 if (!keepUnrolling) {
 logWarning(s"Failed to reserve initial memory threshold of " +
 s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
 } else {
 // 否则这个unrollMemoryUsedByThisBlock 需要加上初始化内存阀值initialMemoryThreshold,表示预留给展开block的内存
 unrollMemoryUsedByThisBlock += initialMemoryThreshold
 }

 // 安全展开block,定期检查是否我们超过阀值
 while (values.hasNext && keepUnrolling) {
 // 迭代每一个vlaue
 vector += values.next()
 // 是否达到我们需要进行内存申请检测
 if (elementsUnrolled % memoryCheckPeriod == 0) {
 // 如果满足条件触发了检测,先获取预估的大小,如果预估的大小超过了内存阀值
 val currentSize = vector.estimateSize()
 // 如果超过了阀值,则需要申请更多的内存,申请算法(当前大小 * 内存增长因子 - 内存阀值)
 if (currentSize >= memoryThreshold) {
 val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
 // 再次为task申请预定用于展开block的内存
 keepUnrolling =
 reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
 // 如果预订成功
 if (keepUnrolling) {
 // 则更新unrollMemoryUsedByThisBlock
 unrollMemoryUsedByThisBlock += amountToRequest
 }
 // 当前内存的阀值也需要更新了
 memoryThreshold += amountToRequest
 }
 }
 // 更新滚动元素
 elementsUnrolled += 1
 }
 // 如果预定成功
 if (keepUnrolling) {
 // 将vector转变位数组
 val arrayValues = vector.toArray
 vector = null
 // 创建一个反序列化的DeserializedMemoryEntry对象
 val entry =
 new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
 val size = entry.size
 def transferUnrollToStorage(amount: Long): Unit = {
 // Synchronize so that transfer is atomic
 memoryManager.synchronized {
 releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
 val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
 assert(success, "transferring unroll memory to storage memory failed")
 }
 }
 // Acquire storage memory if necessary to store this block in memory.
 // 如果需要在内存存储block,申请storage内存
 val enoughStorageMemory = {
 // 滚动这个block使用内存 < block的大小
 if (unrollMemoryUsedByThisBlock <= size) {
 // 我们需要申请额外的storage内存
 val acquiredExtra =
 memoryManager.acquireStorageMemory(
 blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
 if (acquiredExtra) {
 transferUnrollToStorage(unrollMemoryUsedByThisBlock)
 }
 acquiredExtra
 } else { // unrollMemoryUsedByThisBlock > size
 val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
 releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
 transferUnrollToStorage(size)
 true
 }
 }
 // storage内存足够的话,将entry放入内存中
 if (enoughStorageMemory) {
 entries.synchronized {
 entries.put(blockId, entry)
 }
 logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
 blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
 Right(size)
 } else {
 assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
 "released too much unroll memory")
 Left(new PartiallyUnrolledIterator(
 this,
 MemoryMode.ON_HEAP,
 unrollMemoryUsedByThisBlock,
 unrolled = arrayValues.toIterator,
 rest = Iterator.empty))
 }
 } else {
 // 没有足够展开内存用于打开block
 logUnrollFailureMessage(blockId, vector.estimateSize())
 Left(new PartiallyUnrolledIterator(
 this,
 MemoryMode.ON_HEAP,
 unrollMemoryUsedByThisBlock,
 unrolled = vector.iterator,
 rest = values))
 }
}

# remove 从内存中删除某一个blockId对应的数据

def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
 // 从内存中删除
 val entry = entries.synchronized {
 entries.remove(blockId)
 }
 if (entry != null) {
 entry match {
 case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
 case _ =>
 }
 // 开始释放storage内存
 memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
 logDebug(s"Block $blockId of size ${entry.size} dropped " +
 s"from memory (free ${maxMemory - blocksMemoryUsed})")
 true
 } else {
 false
 }
}

# evictBlocksToFreeSpace 试图回收block已释放内存空间

private[spark] def evictBlocksToFreeSpace(
 blockId: Option[BlockId], space: Long,
 memoryMode: MemoryMode): Long = {
 assert(space > 0)

 memoryManager.synchronized {
 var freedMemory = 0L // 剩余内存
 val rddToAdd = blockId.flatMap(getRddId)
 // 选中的block
 val selectedBlocks = new ArrayBuffer[BlockId]
 // 判断block是否可以被回收
 def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
 entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
 }

 entries.synchronized {
 // 遍历每一个entry元素
 val iterator = entries.entrySet().iterator()
 // 剩余的内存小于block大小
 while (freedMemory < space && iterator.hasNext) {
 val pair = iterator.next()
 val blockId = pair.getKey
 val entry = pair.getValue
 if (blockIsEvictable(blockId, entry)) {
 // 更新被选中block
 if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
 selectedBlocks += blockId
 freedMemory += pair.getValue.size
 }
 }
 }
 }
 // 删除block
 def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
 val data = entry match {
 case DeserializedMemoryEntry(values, _, _) => Left(values)
 case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
 }
 // 从内存中删除
 val newEffectiveStorageLevel =
 blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
 if (newEffectiveStorageLevel.isValid) {
 // The block is still present in at least one store, so release the lock
 // but don&#39;t delete the block info
 blockInfoManager.unlock(blockId)
 } else {
 // The block isn&#39;t present in any store, so delete the block info so that the
 // block can be stored again
 blockInfoManager.removeBlock(blockId)
 }
 }
 // 如果空闲内存大于block大小的时候
 if (freedMemory >= space) {
 logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
 s"(${Utils.bytesToString(freedMemory)} bytes)")
 // 开始遍历那些可以从内存中移除的blockId,并且调用dropBlock进行移除
 for (blockId <- selectedBlocks) {
 val entry = entries.synchronized { entries.get(blockId) }
 // This should never be null as only one task should be dropping
 // blocks and removing entries. However the check is still here for
 // future safety.
 if (entry != null) {
 dropBlock(blockId, entry)
 }
 }
 logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
 s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
 freedMemory
 } else {
 blockId.foreach { id =>
 logInfo(s"Will not store $id")
 }
 selectedBlocks.foreach { id =>
 blockInfoManager.unlock(id)
 }
 0L
 }
 }
}

# reserveUnrollMemoryForThisTask 为任务预定内存用于展开指定的block,因为展开block也需要消费内存

def reserveUnrollMemoryForThisTask(
 blockId: BlockId,
 memory: Long,
 memoryMode: MemoryMode): Boolean = {
 memoryManager.synchronized {
 val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
 if (success) {
 val taskAttemptId = currentTaskAttemptId()
 val unrollMemoryMap = memoryMode match {
 case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
 case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
 }
 unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
 }
 success
 }
}
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
 val taskAttemptId = currentTaskAttemptId()
 memoryManager.synchronized {
 val unrollMemoryMap = memoryMode match {
 case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
 case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
 }
 if (unrollMemoryMap.contains(taskAttemptId)) {
 val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
 if (memoryToRelease > 0) {
 unrollMemoryMap(taskAttemptId) -= memoryToRelease
 memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
 }
 if (unrollMemoryMap(taskAttemptId) == 0) {
 unrollMemoryMap.remove(taskAttemptId)
 }
 }
 }
}
相关文章
最新文章
热点推荐