内存管理

从Spark 1.6版本开始,Spark采用Unified Memory Management这样一种新的内存管理模型。

Spark中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于shuffles、joins、sorts和aggregations,存储内存则用于缓存或者跨节点的内部数据传输。

在Spark 1.6之前,这两部分内存的分配是静态的,以配置的方式进行设置,对应的管理类为StaticMemoryManager。这种管理方式的缺陷不言自明,因为它不能根据不同的数据处理场景调整内存的比例,在内存使用和性能方面都存在局限性。

Unified Memory Management in Spark 1.6一文列举了这种内存管理方式的限制:

  • There are no sensible defaults that apply to all workloads
  • Tuning memory fractions requires user expertise of internals
  • Applications that do not cache use only a small fraction of available memory

旧有(1.6版本之前)的内存管理

概念上,内存空间被分成了三块独立的区域,每块区域的内存容量是按照JVM堆大小的固定比例进行分配的:

  • Execution:在执行shuffle、join、sort和aggregation时,用于缓存中间数据。通过spark.shuffle.memoryFraction进行配置,默认为0.2。
  • Storage:主要用于缓存数据块以提高性能,同时也用于连续不断地广播或发送大的任务结果。通过`spark.storage.memoryFraction进行配置,默认为0.6。
  • Other:这部分内存用于存储运行Spark系统本身需要加载的代码与元数据,默认为0.2。

无论是哪个区域的内存,只要内存的使用量达到了上限,则内存中存储的数据就会被放入到硬盘中,从而清理出足够的内存空间。这样一来,由于与执行或存储相关的数据在内存中不存在,就会影响到整个系统的性能,导致I/O增长,或者重复计算。

多数情况下,Execution和Storage的内存容量未必会同时达到上限,但由于它们的容量是按照百分比配置的,若比例设置不合理,很可能导致出现使用率不平衡的情况。就好比贫富不均,自然会带来财富(内存空间)的浪费。

对于JVM的内存管理,我们还要考虑Out Of Memory(OOM)的情形,例如突然出现不可预知的超大的数据项,就可能导致内存不够。为避免这种情况,我们就不能将内存都分配给Spark的这三块内存空间,就好似设计电梯,必须要保证实际的承重要远大于规定的安全承重值。毕竟这种配置的方式很难保证准确估算,以满足各种复杂的数据分析场景。于是Spark提供了一个safe fraction,以便于为内存使用提供一个安全的缓存空间。Execution与Storage内存空间的safe fraction分别通过如下三个配置项配置:

  • spark.shuffle.safeFraction(默认值为0.8)
  • spark.storage.safeFraction(默认值为0.9)
  • spark.storage.unrollFraction(默认值为0.2)

以默认设置而论,用于执行的内存空间只占整个JVM堆容量的0.2*0.8=16%,正常情况下,内存的利用率极低,为安全故,却又必须预留出更多的内存避免内存溢出。

Execution的内存管理

Execution内存进一步为多个运行在JVM中的任务分配内存。与整个内存分配的方式不同,这块内存的再分配是动态分配的。在同一个JVM下,倘若当前仅有一个任务正在执行,则它可以使用当前可用的所有Execution内存。

Spark提供了如下Manager对这块内存进行管理:

  • ShuffleMemoryManager:它扮演了一个中央决策者的角色,负责决定分配多少内存给哪些任务。一个JVM对应一个ShuffleMemoryManager
  • TaskMemoryManager:记录和管理每个任务的内存分配,它实现为一个page table,用以跟踪堆(heap)中的块,侦测当异常抛出时可能导致的内存泄露。在其内部,调用了ExecutorMemoryManager去执行实际的内存分配与内存释放。一个任务对应一个TaskMemoryManager
  • ExecutorMemoryManager:用于处理on-heap和off-heap的分配,实现为弱引用的池允许被释放的page可以被跨任务重用。一个JVM对应一个ExecutorMemeoryManager

内存管理的执行流程大约如下:

当一个任务需要分配一块大容量的内存用以存储数据时,首先会请求ShuffleMemoryManager,告知:我想要X个字节的内存空间。如果请求可以被满足,则任务就会要求TaskMemoryManager分配X字节的空间。一旦TaskMemoryManager更新了它内部的page table,就会要求ExecutorMemoryManager去执行内存空间的实际分配。

这里有一个内存分配的策略。假定当前的active task数据为N,那么每个任务可以从ShuffleMemoryManager处获得多达1/N的执行内存。分配内存的请求并不能完全得到保证,例如内存不足,这时任务就会将它自身的内存数据释放。根据操作的不同,任务可能重新发出请求,又或者尝试申请小一点的内存块。


To avoid excessive spilling, a task does not spill unless it has acquired up to 1/(2N) of the total memory. If there is not enough free memory to acquire even up to 1/(2N), the request will block until other tasks spill and free their shares. Otherwise, new incoming tasks may spill constantly while existing jumbo tasks continue to occupy much of the memory without spilling.

释放(spill)任务的内存数据需要谨慎,因为它可能会影响系统的分析性能。为了避免出现过度的内存数据清理操作,Spark规定:除非任务已经获得了整个内存空间的1/(2N)空间,否则不会执行清理操作。倘若没有足够的空闲内存空间,当前任务的请求会被阻塞,直到其他任务清理或释放了它们的内存数据。

例如,一个executor启动了唯一一个任务A(即此时的N值为1),那么该任务可以获得当前所有可用的内存空间。当任务B也启动后,N值变为2。由于没有足够的空闲内存,任务B会被阻塞。此时,A开始释放内存空间,当任务B获得了1/(2N)=1/4的内存空间后,所有任务都可以执行空间的释放(spill)了。

注意,如果不是任务A无法从管理器那里获得内存,它是不会执行spill操作的。在上述例子中,从一开始任务A就获得所有可用的内存空间,因而在它不执行spill操作的时刻,其他所有新任务都无法得到想要的内存空间(处于饥饿状态),因为已有任务(即任务A)使用的内存空间已经超出了它们能够平分的空间。

spill的操作是由配置项spark.shuffle.spill控制的,默认值为true,用于指定Shuffle过程中如果内存中的数据超过阈值,是否需要将部分数据临时写入外部存储。如果设置为false,这个过程就会一直使用内存,可能导致OOM。

如果Spill的频率太高,可以适当地增加spark.shuffle.memoryFraction来增加Shuffle过程的可用内存数,进而减少Spill的频率。当然,为了避免OOM,可能就需要减少RDD cache所用的内存(即Storage Memory)。

Storage的存储管理

Storage内存由更加通用的BlockManager管理。如前所说,Storage内存的主要功能是用于缓存RDD Partitions,但也用于将容量大的任务结果传播和发送给driver

Spark提供了Storage Level来指定块的存放位置:Memory、Disk或者Off-Heap。Storage Level同时还可以指定存储时是否按照序列化的格式。当Storage Level被设置为MEMORY_AND_DISK_SER时,内存中的数据以字节数组(byte array)形式存储,当这些数据被存储到硬盘中时,不再需要进行序列化。若设置为该Level,则evict数据会更加高效。

Cache中的数据不会一直存在,所以会在合适的时候被Evict(可以理解抹去数据)。Spark主要采用的Evict策略为LRU,且该策略仅针对内存中的数据块。不过,倘若一个RDD块已经在Cache中存在,那么Spark永远不会为了缓存该RDD块的额外的块而将这个已经存在的RDD块抹掉。这是显而易见的,倘若抹掉了这个RDD块,又何必去缓存它额外的块呢?

如果BlockManager接收到的数据以迭代器(Iterator)形式组成,且这个Block最终需要保存到内存中,则BlockManager会将迭代器展开(Unrolling),这就意味着需要耗费比迭代器更多的内存,甚至可能该迭代器代表的数组需要的容量会超过内存空间,故而BlockManager只能逐步地展开迭代器以避免OOM,在展开时,需要定期地去检查内存空间是否足够。

Unrollong使用的内存倘若不够,会从Storage Memory中借用。这个借用者其实是很霸道的,倘若当前没有Block,他可以借走所有的Storage Memory空间。如果Storage Memory已有block使用,他还会鹊巢鸠占,强制将内存中的block抹去,唯一约束他的是spark.storage.unrollFraction配置项(默认为0.2),也就是说他抹去的内存按照这个配置的比例计算,也不是肆无忌惮的。

1.6版本的内存管理

新的配置项

到了1.6版本,Execution Memory和Storage Memory之间支持跨界使用。当执行内存不够时,可以借用存储内存,反之亦然。

1.6版本的实现方案支持借来的存储内存随时都可以释放,但借来的执行内存却不能如此。

新的版本引入了新的配置项:

  • spark.memory.fraction(默认值为0.75):用于设置存储内存和执行内存占用堆内存的比例。若值越低,则发生spill和evict的频率就越高。注意,设置比例时要考虑Spark自身需要的内存量。
  • spark.memory.storageFraction(默认值为0.5):显然,这是存储内存所占spark.memory.fraction设置比例内存的大小。当整体的存储容量超过该比例对应的容量时,缓存的数据会被evict。
  • spark.memory.useLegacyMode(默认值为false):若设置为true,则使用1.6版本前的内存管理机制。此时,如下五项配置均生效:
    • spark.storage.memoryFraction
    • spark.storage.safetyFraction
    • spark.storage.unrollFraction
    • spark.shuffle.memoryFraction
    • spark.shuffle.safetyFraction

如何应对内存压力

当内存不足时,我们需要Evict内存中已有的数据,但是这需要考虑Evict数据的成本。对于存储内存,eviction的成本取决于设置的Storage Level。如果设置为MEMORY_ONLY就意味着一旦内存中的数据被evict,当再次需要的时候就需要重新计算,以此而推,设置为​EMORY_AND_DISK_SER自然成本最低,因为内容可以从硬盘中直接获取,无需重新计算,也不需要重新序列化内容,因而唯一的损耗是I/O。

执行内存的Evict就完全不同了,由于被Evict的数据都被spill到磁盘中了,故而执行内存存储的数据无需重新计算,且执行数据是以压缩格式存储的,故而降低了序列化的成本。

但是,这并不意味着Evict执行内存的成本就一定低于存储内存。根据执行内存的本质,在数据分析过程中,常常需要将spilled的执行内存读回,这就需要维护一个引用。如果不考虑重计算的成本,Evict执行内存的成本甚至要远远高于存储内存。

正是因为这个原因,实现存储内存的Eviction相对更容易,只需要使用现有的Eviction机制清除掉对应的数据块即可。

MemoryManager

1.6版本的内存管理主要由类MemoryManager承担。这是一个抽象类,提供的主要方法包括:

def acquireExecutionMemory(numBytes:Long):Long

def acquireStorageMemory(blockId:BlockId,numBytes:Long):Long

def releaseExecutionMemory(numBytes:Long):Unit

def releaseStorageMemory(numBytes:Long):Unit

继承这个抽象类的子类包括StaticMemoryManagerUnifiedMemoryManager。前者就是1.6版本之前的内存管理器,后者则实现了最新的内存管理机制。

阅读UnifiedMemoryManager类中的主要方法acquireExecutionMemoryacquireStorageMemory,其执行流程还是非常清楚的。acquireExecutionMemory方法的实现如下:

  /**
   * Try to acquire up to `numBytes` of execution memory for the current task and return the
   * number of bytes obtained, or 0 if none can be allocated.
   *
   * This call may block until there is enough free memory in some situations, to make sure each
   * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
   * active tasks) before it is forced to spill. This can happen if the number of tasks increase
   * but an older task had a lot of memory already.
   */
  override private[memory] def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
    assert(numBytes >= 0)
    memoryMode match {
      case MemoryMode.ON_HEAP =>

        /**
         * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
         *
         * When acquiring memory for a task, the execution pool may need to make multiple
         * attempts. Each attempt must be able to evict storage in case another task jumps in
         * and caches a large block between the attempts. This is called once per attempt.
         */
        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
          if (extraMemoryNeeded > 0) {
            // There is not enough free memory in the execution pool, so try to reclaim memory from
            // storage. We can reclaim any free memory from the storage pool. If the storage pool
            // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
            // the memory that storage has borrowed from execution.
            val memoryReclaimableFromStorage =
              math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
            if (memoryReclaimableFromStorage > 0) {
              // Only reclaim as much space as is necessary and available:
              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
            }
          }
        }

        /**
         * The size the execution pool would have after evicting storage memory.
         *
         * The execution memory pool divides this quantity among the active tasks evenly to cap
         * the execution memory allocation for each task. It is important to keep this greater
         * than the execution pool size, which doesn't take into account potential memory that
         * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
         *
         * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
         * in execution memory allocation across tasks, Otherwise, a task may occupy more than
         * its fair share of execution memory, mistakenly thinking that other tasks can acquire
         * the portion of storage memory that cannot be evicted.
         */
        def computeMaxExecutionPoolSize(): Long = {
          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
        }

        onHeapExecutionMemoryPool.acquireMemory(
          numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)

      case MemoryMode.OFF_HEAP =>
        // For now, we only support on-heap caching of data, so we do not need to interact with
        // the storage pool when allocating off-heap memory. This will change in the future, though.
        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    }
  }

以上代码中,最重要的实现放在函数maybeGrowExecutionPool中。这个方法会判断是否需要增加执行内存,倘若事先设置的执行内存空间没有足够可用的内存,就会尝试从存储内存中借用。倘若存储内存的空间已经大于storageRegionSize设置的值,就需要根据借用的内存大小把存储内存中的存储块evict。

调整内存大小以及Evict存储块,是在maybeGrowExecutionPool函数内部中调用StorageMemoryPool的函数shrinkPoolToFreeSpace来完成的:

 def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
    // First, shrink the pool by reclaiming free memory:
    val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
      val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
      // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
      // not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
      decrementPoolSize(spaceFreedByEviction)
      spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
    } else {
      spaceFreedByReleasingUnusedMemory
    }
  }

results matching ""

    No results matching ""