首页 > 网络 > 云计算 >

Spark源码知识讲解之MemoryManager

2017-11-11

Spark源码知识讲解之MemoryManager。它会强制管理存储(storage)和执行(execution)之间的内存使用。 记录用了多少 storage memory 和 execution memory

它会强制管理存储(storage)和执行(execution)之间的内存使用

# 记录用了多少 storage memory 和 execution memory

# 申请 storage、execution 和 unroll memory

# 释放 storage 和 execution memory

execution memory: 是指 shuffles,joins,sorts 和 aggregation 的计算操作

storage memory:是指persist或者cache缓存数据到内存等

unroll memory: 则是指展开block本身就需要耗费内存,好比打开文件,打开文件我们是需要耗费内存的

MemoryManager根据spark.memory.useLegacyMode这个配置项决定你是否使用遗留的MemoryManager策略即StaticMemoryManager。默认是不使用StaticMemoryManager,而是UnifiedMemoryManager。

一 MemoryManager

1.1 核心属性

Int numCores: 核数

Long onHeapStorageMemory:堆内storage 内存大小

Long onHeapExecutionMemory: 堆内execution内存大小

StorageMemoryPool onHeapStorageMemoryPool:创建堆内storage内存池

StorageMemoryPool offHeapStorageMemoryPool:创建堆外storage内存池

ExecutionMemoryPool onHeapExecutionMemoryPool:创建堆内execution内存池

ExecutionMemoryPool offHeapExecutionMemoryPool:创建堆外execution内存池

Long maxOffHeapMemory: 最大的对外内存大小,可以由spark.memory.offHeap.size配置,如果要配置必须启用了才可以生效spark.memory.offHeap.enabled

Long maxOnHeapStorageMemory: 最大的堆内storage内存大小

Long maxOffHeapStorageMemory 最大的堆外storage内存大小

1.2 重要方法

# 释放numBytes字节的执行内存

defreleaseExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Unit = synchronized {
memoryMode match{
case MemoryMode.ON_HEAP=> onHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
case MemoryMode.OFF_HEAP=> offHeapExecutionMemoryPool.releaseMemory(numBytes,taskAttemptId)
}
}

# 释放指定task的所有execution内存

private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
 onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
 offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}

# 释放N字节存储内存

def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
 memoryMode match {
 case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
 case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
 }
}

# 释放所有存储内存

final def releaseAllStorageMemory(): Unit = synchronized {
 onHeapStorageMemoryPool.releaseAllMemory()
 offHeapStorageMemoryPool.releaseAllMemory()
}

二 StaticMemoryManager

Executor的内存界限分明,分别由3部分组成:execution,storage和system。对各部分内存静态划分好后便不可变化

# executor:execution内存大小通过设置spark.shuffle.memoryFraction参数来控制大小,默认为0.2。

为了避免shuffle,join,排序和聚合这些操作直接将数据写入磁盘,所设置的buffer大小,减少了磁盘读写的次数。

#storage: storage内存大小通过设置spark.storage.memoryFraction参数来控制大小,默认为0.6。

用于存储用户显示调用的persist,cache,broadcast等命令存储的数据空间。

#system:程序运行需要的空间,存储一些spark内部的元数据信息,用户的数据结构,避免一些不寻常的大记录带来的OOM。

这种划分方式,在某些时候可能会带来一定的资源浪费,比如我对cache或者persist没啥要求,那么storage的内存就剩余了

由于很多属性都继承了父类MemoryManager,在这里不做赘述。

# maxUnrollMemory

最大的block展开内存空间,默认是占用最大存储内存的20%

private val maxUnrollMemory: Long = {
 (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}

# 申请分配storage内存,注意StaticMemoryManager不支持堆外内存

override def acquireStorageMemory(
 blockId: BlockId,
 numBytes: Long,
 memoryMode: MemoryMode): Boolean = synchronized {
 require(memoryMode != MemoryMode.OFF_HEAP,
 "StaticMemoryManager does not support off-heap storage memory")
 // 要申请的空间大小超过最大的storage内存,肯定失败
 if (numBytes > maxOnHeapStorageMemory) {
 // Fail fast if the block simply won't fit
 logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
 s"memory limit ($maxOnHeapStorageMemory bytes)")
 false
 } else {
 // 调用StorageMemoryPool分配numBytes字节内存
 onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
 }
}

# acquireUnrollMemory 用于申请展开block的内存

override def acquireUnrollMemory(
 blockId: BlockId, numBytes: Long,
 memoryMode: MemoryMode): Boolean = synchronized {
 require(memoryMode != MemoryMode.OFF_HEAP,
 "StaticMemoryManager does not support off-heap unroll memory")
 // 当前storage内存用于展开block的所需要内存
 val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
 // 当前storage中获取空闲的内存
 val freeMemory = onHeapStorageMemoryPool.memoryFree
 // 判断还剩余的可用于展开block的内存-还剩余的内存,如果小于或者等0表示不够分配了,没有空闲内存可供分配
 val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
 val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
 onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}

# acquireExecutionMemory申请执行内存

override def acquireExecutionMemory(
 numBytes: Long,
 taskAttemptId: Long,
 memoryMode: MemoryMode): Long = synchronized {
 memoryMode match {
 case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
 case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
 }
}

# getMaxStorageMemory 返回有效的最大的storage内存空间

private def getMaxStorageMemory(conf: SparkConf): Long = {
 // 系统最大内存内存
 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
 // 内存占用比例
 val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
 // 安全比例
 val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
 (systemMaxMemory * memoryFraction * safetyFraction).toLong
}

# getMaxExecutionMemory 返回最大的execution内存空间

private def getMaxExecutionMemory(conf: SparkConf): Long = {
 // 系统内存空间
 val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
 // 如果系统内存空间 < 最小内存空间,抛出异常
 if (systemMaxMemory < MIN_MEMORY_BYTES) {
 throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
 s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
 s"option or spark.driver.memory in Spark configuration.")
 }
 // 如果指定了执行内存空间
 if (conf.contains("spark.executor.memory")) {
 // 获取执行执行内存空间
 val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
 // 如果执行内存空间 < 最小内存,抛出异常
 if (executorMemory < MIN_MEMORY_BYTES) {
 throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
 s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
 s"--executor-memory option or spark.executor.memory in Spark configuration.")
 }
 }
 // shuffle内存占用比例
 val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
 // shuffle内存安全比例
 val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
 (systemMaxMemory * memoryFraction * safetyFraction).toLong
}

三UnifiedMemoryManager

由于传统的StaticMemoryManager存在资源浪费问题,所以引入了这个MemoryManager。UnifiedMemoryManager管理机制淡化了execution空间和storage空间的边界,让它们之间可以相互借内存。

它们总共可用的内存由spark.memory.fraction决定,默认0.6.可使用的堆内存比例 * 可使用的内存。在该空间内部,对execution和storage进行了进一步的划分。由spark.memory.storageFraction决定

# 计算最大的存储内存

计算最大的存储内存 = 最大内存 - 最大执行内存
override def maxOnHeapStorageMemory: Long = synchronized {
 maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed
}

# 计算最大的堆外存储内存

计算最大的堆外存储内存 = 最大堆外内存 - 最大堆外执行内存
override def maxOffHeapStorageMemory: Long = synchronized {
 maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed
}

# acquireExecutionMemory 申请执行内存

override private[memory] def acquireExecutionMemory(
 numBytes: Long, taskAttemptId: Long,
 memoryMode: MemoryMode): Long = synchronized {
 assertInvariants()
 assert(numBytes >= 0)
 // 跟据不同内存模式,构建不同的组件和初始值
 val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
 case MemoryMode.ON_HEAP => (
 onHeapExecutionMemoryPool,
 onHeapStorageMemoryPool,
 onHeapStorageRegionSize,
 maxHeapMemory)
 case MemoryMode.OFF_HEAP => (
 offHeapExecutionMemoryPool,
 offHeapStorageMemoryPool,
 offHeapStorageMemory,
 maxOffHeapMemory)
 }

 // 通过回收缓存的block,会增加执行内存,从而存储内存量就占用内存量减少了
 // 当为task申请内存的实时呢,执行内存需要多次尝试,每一次尝试可能都会回收一些存储内存
 def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
 // 如果需要申请的内存大于0
 if (extraMemoryNeeded > 0) {
 // 计算execution 可以借到的storage的内存,应该是storage的空闲内存空间和可借出的内存较大者
 val memoryReclaimableFromStorage = math.max(
 storagePool.memoryFree,// storage的空闲内存空间
 storagePool.poolSize - storageRegionSize) // 可借出的内存
 // 如果可以借到内存
 if (memoryReclaimableFromStorage > 0) {
 // 减小pool大小,释放一些内存空间
 val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
 math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
 storagePool.decrementPoolSize(spaceToReclaim)
 executionPool.incrementPoolSize(spaceToReclaim)
 }
 }
 }

 // 计算存储内存占用的内存和存储
 def computeMaxExecutionPoolSize(): Long = {
 maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
 }

 executionPool.acquireMemory(
 numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
}
// 申请分配存储内存
override def acquireStorageMemory(
 blockId: BlockId,
 numBytes: Long,
 memoryMode: MemoryMode): Boolean = synchronized {
 assertInvariants()
 assert(numBytes >= 0)
 // 跟据不同内存模式,构建不同的组件和初始值v
 val (executionPool, storagePool, maxMemory) = memoryMode match {
 case MemoryMode.ON_HEAP => (
 onHeapExecutionMemoryPool,
 onHeapStorageMemoryPool,
 maxOnHeapStorageMemory)
 case MemoryMode.OFF_HEAP => (
 offHeapExecutionMemoryPool,
 offHeapStorageMemoryPool,
 maxOffHeapMemory)
 }
 // 如果要申请的内存空间大于最大内存空间,直接返回false
 if (numBytes > maxMemory) {
 logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
 s"memory limit ($maxMemory bytes)")
 return false
 }
 // 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借
 if (numBytes > storagePool.memoryFree) {
 // 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存
 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
 executionPool.decrementPoolSize(memoryBorrowedFromExecution)
 storagePool.incrementPoolSize(memoryBorrowedFromExecution)
 }
 storagePool.acquireMemory(blockId, numBytes)
}

# acquireStorageMemory 申请分配存储内存

override def acquireStorageMemory(
 blockId: BlockId,
 numBytes: Long,
 memoryMode: MemoryMode): Boolean = synchronized {
 assertInvariants()
 assert(numBytes >= 0)
 // 跟据不同内存模式,构建不同的组件和初始值v
 val (executionPool, storagePool, maxMemory) = memoryMode match {
 case MemoryMode.ON_HEAP => (
 onHeapExecutionMemoryPool,
 onHeapStorageMemoryPool,
 maxOnHeapStorageMemory)
 case MemoryMode.OFF_HEAP => (
 offHeapExecutionMemoryPool,
 offHeapStorageMemoryPool,
 maxOffHeapMemory)
 }
 // 如果要申请的内存空间大于最大内存空间,直接返回false
 if (numBytes > maxMemory) {
 logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
 s"memory limit ($maxMemory bytes)")
 return false
 }
 // 如果要申请的内存空间比当前storage剩余空间多,不够用则去向execution借
 if (numBytes > storagePool.memoryFree) {
 // 表示没有足够内存,需要从执行缓存借一些数据,增加storage内存,缩小execution内存
 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
 executionPool.decrementPoolSize(memoryBorrowedFromExecution)
 storagePool.incrementPoolSize(memoryBorrowedFromExecution)
 }
 storagePool.acquireMemory(blockId, numBytes)
}

#getMaxMemory 返回最大的内存

private def getMaxMemory(conf: SparkConf): Long = {
 // 获取系统内存
 val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
 // 获取预留的内存
 val reservedMemory = conf.getLong("spark.testing.reservedMemory",
 if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
 // 最小的系统内存
 val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
 // 如果系统内存 < 最小的系统内存,抛出异常
 if (systemMemory < minSystemMemory) {
 throw new IllegalArgumentException(s"System memory $systemMemory must " +
 s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
 s"option or spark.driver.memory in Spark configuration.")
 }
 // 如果指定了executor内存
 if (conf.contains("spark.executor.memory")) {
 // 获取executor内存
 val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
 // 如果executor内存 < 最小的系统内存抛出异常
 if (executorMemory < minSystemMemory) {
 throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
 s"$minSystemMemory. Please increase executor memory using the " +
 s"--executor-memory option or spark.executor.memory in Spark configuration.")
 }
 }
 // 系统内存 - 预留的系统内存 = 可使用的内存
 val usableMemory = systemMemory - reservedMemory
 // 可使用的JVM堆内存比例,默认60%
 val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
 // 返回可使用的堆内存比例 * 可使用的内存
 (usableMemory * memoryFraction).toLong
}
相关文章
最新文章
热点推荐