赞
踩
ContextCleaner用于清理任务执行过程中生成的超出应用范围的RDD、Shuffle、Broadcast、Accumulator和检查点数据。Spark运行过程中会产生一堆临时文件和临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果不清理会产生大量的垃圾数据,最终会导致集群资源耗尽。
ContextCleaner类主要包括五个成员:
referenceBuffer:缓存CleanupTaskWeakReference
对象引用;
refeerenceQueue:缓存从CleanupTaskWeakReference
中加入的只剩余弱引用的引用队列;
listeners:监听对象清理工作的监听器数组;
cleaningThread:清理对象引用的守护线程,名称为Spark Context Cleaner
,用于清理不被使用的对象引用。
periodicGCService:单一守护线程池,名称为context-cleaner-periodic-gc
,用于周期(默认30min)调用System.gc()
方法,来清理JVM内存中不被使用的对象。
/** * RDD、shuffle和broadcast状态的异步清理器 * 保存每个感兴趣的RDD、ShuffleDependency和Broadcast对象的弱引用,当关联对象超出应用范围(出了变量作用域,或者无强引用)时进行处理。 * 实际是由一个单独的后台线程来清理的。 */ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { /** * 保证`CleanupTaskWeakReference`的对象只要不被`reference queue`处理,就不会被垃圾收集掉的缓冲队列。 */ private val referenceBuffer = Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) private val referenceQueue = new ReferenceQueue[AnyRef] // 仅为测试目的使用,当清理对象时调用注册的所有监听器 private val listeners = new ConcurrentLinkedQueue[CleanerListener]() // 两个线程用不同的方式实现,clean线程一直执行,而gc线程周期调度执行 private val cleaningThread = new Thread() { override def run() { keepCleaning() }} private val periodicGCService: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc") /** 注册一个待清理的对象 */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) } ... }
ContextCleaner如何知道哪些数据对象是不再被使用的呢?
首先需要向ContextCleaner注册要清理的数据对象。然后ContextC
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。