赞
踩
注:不同spark版本源码可能不一样,本机spark版本是3.0.1
SparkListener
Spark中的事件监听机制,本质上其实就是观察者模式的实现,查看源码我们可以经常看到listener这种命名的类或对象,顾名思义,这就是监听器类或对象。下面就以SparkListener为例来解析事件监听是如何设计的。首先我们看SparkListener
*/***
** :: DeveloperApi ::*
** A* *default* *implementation* *for* `*SparkListenerInterface*` *that has no-op implementations* *for*
** all callbacks.*
***
** Note that* *this* *is an internal* *interface* *which* *might* *change* *in* *different* *Spark* *releases**.*
**/*
@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {
//省略很多方法
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
}
为了简化分析我们以应用的启动与结束来分析。这里我们可以看到onApplicationStart和onApplicationEnd两个方法,很明显是在应用执行前和执行完做一些事情。
监听器的结构
如果要要使得类在应用执行前和应用执行后做一些响应,就需要继承SparkListener类。
private class BasicJobCounter extends SparkListener{ /** *app 开始时跳去的方法 * 该方法可以获取 appId,appName ,app开始的时间以及 执行程序的用户 * @param applicationStart */ override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { println("onApplicationStart") println(s"applicationStart.appAttemptId = ${applicationStart.appAttemptId}") println(s"applicationStart.appId = ${applicationStart.appId}") println(s"applicationStart.appName = ${applicationStart.appName}") println(s"applicationStart.driverLogs = ${applicationStart.driverLogs}") println(s"applicationStart.sparkUser = ${applicationStart.sparkUser}") println(s"applicationStart.time = ${applicationStart.time}") println("onApplicationStart") } /** * app结束时调用的方法 * 可以获取app结束的时间点 * @param applicationEnd */ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { println("onApplicationEnd") println(s"applicationEnd.time = ${applicationEnd.time}") println("onApplicationEnd") } }
重载了onApplicationEnd和onApplicationStart方法
监听器如何使用
也许大家都会有一个困惑,BasicJobCounter仅仅继承SparkListener类就能获取事件的监听,甚至后者没有任何消息接受和处理逻辑,来实现事件响应呢?举个例子来给大家说明下
object ListenerTest { def main(args: Array[String]): Unit = { val listener = new BasicJobCounter val conf = new SparkConf().setAppName("listener").setMaster("local[*]") //.set("spark.extraListeners", classOf[BasicJobCounter].getName) .set("spark.executor.heartbeatInterval", "1000ms") val sc: SparkContext = new SparkContext(conf) sc.addSparkListener(listener) sc.setLogLevel("WARN") //2.source/读取数据 //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单! //RDD[就是一行行的数据] val lines: RDD[String] = sc.textFile("data/input/words.txt") //3.transformation/数据操作/转换 //切割:RDD【一个一个的单词】 val words: RDD[String] = lines.flatMap(_.split(" ")) // 记为1:RDD【(单词, 1)】 val wordAndOnes: RDD[(String, Int)] = words.map((_,1)) //分组聚合:groupBy + mapValues(_.map(_._2).reduce(_+_)) ===>在Spark里面分组+聚合一步搞定:reduceByKey val result: RDD[(String, Int)] = wordAndOnes.reduceByKey(_+_) //4.sink/输出 //直接输出 result.foreach(*println*) //收集为本地集合再输出 //println(result.collect().toBuffer) //输出到指定path(可以是文件/夹) //result.repartition(1).saveAsTextFile("data/output/result") //为了便于查看Web-UI可以让程序睡一会 //Thread.sleep(1000 * 6000) //*TODO 5.关闭资源* sc.stop() } }
监听器如何工作
从代码可以看出,在申明一个BasicJobCounter对象之后,需要将该对象加入sc中,sc其实就是一个Spark application的入口。SparkContext中addSparkListener方法的代码如下:
*/***
** :: DeveloperApi ::*
** Register a listener to receive up-calls* *from* *events that happen during execution.*
**/*
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface): Unit = {
listenerBus.addToSharedQueue(listener)
}
这里的listenerBus,是一个监听器总线对象,其声明如下:
// An asynchronous listener bus for Spark events
private[spark] def listenerBus: LiveListenerBus = _listenerBus
现在思路就变得很清晰了,BasicJobCounter对象是注册到LiveListenerBus对象中,然后通过LiveListenerBus对象来实现事件监听,其实这里我们通过取名就可以知道其设计思路,类似于计算机中的总线,设备都通过总线来传递消息,而LiveListenerBus就刚好充当了总线的角色,一个个SparkListener子类对象就是一个个的设备,它们可以接受来自总线的消息并作出相应的处理。
监听器总线如何传递消息
我们可以看一下LiveListenerBus类,该类实现了SparkListenerBus接口,直接看其入口start方法:
*/***
** Start an asynchronous thread to dispatch events to the underlying listeners.*
***
*** ***@param\*** ***sc\*** *Used to stop the SparkContext in* *case* *the async dispatcher fails.*
**/*
private[scheduler] def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
this.sc = sc
dispatchThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
直接启动了一个dispatchThread:
private val dispatchThread = new Thread(s"spark-listener-group-$name") { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sc) { dispatch() } } private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { var next: SparkListenerEvent = *eventQueue.take() while (next != POISON_PILL) { val ctx = processingTime.time() try { super.postToAll(next) } finally { ctx.stop() } eventCount.decrementAndGet() next = eventQueue.take() } eventCount.decrementAndGet() }
可以看到这个dispatchThread是一个守护线程调用了dispatch方法,其核心逻辑就是不停地在一个事件队列eventQueue里取出事件,如果事件合法且LiverListenerBus没有被关停,就将事件通知给所有注册的listener中,postToAll方法在ListenerBus接口中实现:
final def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}
思路很清晰,就是用迭代器遍历listener,逐个将消息发送。而doPostEvent方法是一个抽象方法,其具体实现要由继承自ListerBus的类负责,比如之前举例中的SparkListener,就有相应的SparkListenerBus接口:
private[spark] trait SparkListenerBus extends ListenerBus[SparkListenerInterface, SparkListenerEvent] { protected override def doPostEvent( listener: SparkListenerInterface, event: SparkListenerEvent): Unit = { event match { //部分代码省略 case executorAdded: SparkListenerExecutorAdded => listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => listener.onExecutorRemoved(executorRemoved) //部分代码省略 case _ => listener.onOtherEvent(event) } } }
这里对每个事件进行类型匹配,比如doPostEvent需要将onApplicationStart事件告知一个listener,对应地,这个listener就调用一下自己的onApplicationStart方法,对该事件作出自己的反应.
小结
总结之,Spark中监听器的实现核心其实就是一个个需要对事件响应的监听器对象,注册到一个监听器总线,需要发送事件消息的组件将发生的事件消息提交到总线,然后总线将事件消息转发给一个个注册在它上面的监听器,最后监听器对事件进行响应。其实就是一个典型的观察者模式使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。