wordpress自适应建站wordpress 指定

diannao/2026/1/22 23:02:23/文章来源:
wordpress自适应建站,wordpress 指定,摄影毕业设计选题作品,wordpress edit.phpSpark事件总线机制 采用Spark2.11源码#xff0c;以下类或方法被DeveloperApi注解额部分#xff0c;可能出现不同版本不同实现的情况。 Spark中的事件总线用于接受事件并提交到对应的监听器中。事件总线在Spark应用启动时#xff0c;会在SparkContext中激活spark运行的事件总…Spark事件总线机制 采用Spark2.11源码以下类或方法被DeveloperApi注解额部分可能出现不同版本不同实现的情况。 Spark中的事件总线用于接受事件并提交到对应的监听器中。事件总线在Spark应用启动时会在SparkContext中激活spark运行的事件总线LiveListenerBus。 LiveListenerBus相关的部分类图如下 由于Spark使用scala语言编写的所以在类图上的接口代表的是Traits类的接口功能。 #mermaid-svg-ZtOLirsbpChUgZpv {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ZtOLirsbpChUgZpv .error-icon{fill:#552222;}#mermaid-svg-ZtOLirsbpChUgZpv .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ZtOLirsbpChUgZpv .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ZtOLirsbpChUgZpv .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ZtOLirsbpChUgZpv .marker.cross{stroke:#333333;}#mermaid-svg-ZtOLirsbpChUgZpv svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup text{fill:#9370DB;fill:#131300;stroke:none;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:10px;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup text .title{font-weight:bolder;}#mermaid-svg-ZtOLirsbpChUgZpv .nodeLabel,#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel{color:#131300;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel .label rect{fill:#ECECFF;}#mermaid-svg-ZtOLirsbpChUgZpv .label text{fill:#131300;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeLabel .label span{background:#ECECFF;}#mermaid-svg-ZtOLirsbpChUgZpv .classTitle{font-weight:bolder;}#mermaid-svg-ZtOLirsbpChUgZpv .node rect,#mermaid-svg-ZtOLirsbpChUgZpv .node circle,#mermaid-svg-ZtOLirsbpChUgZpv .node ellipse,#mermaid-svg-ZtOLirsbpChUgZpv .node polygon,#mermaid-svg-ZtOLirsbpChUgZpv .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-ZtOLirsbpChUgZpv .divider{stroke:#9370DB;stroke:1;}#mermaid-svg-ZtOLirsbpChUgZpv g.clickable{cursor:pointer;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup rect{fill:#ECECFF;stroke:#9370DB;}#mermaid-svg-ZtOLirsbpChUgZpv g.classGroup line{stroke:#9370DB;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv .classLabel .box{stroke:none;stroke-width:0;fill:#ECECFF;opacity:0.5;}#mermaid-svg-ZtOLirsbpChUgZpv .classLabel .label{fill:#9370DB;font-size:10px;}#mermaid-svg-ZtOLirsbpChUgZpv .relation{stroke:#333333;stroke-width:1;fill:none;}#mermaid-svg-ZtOLirsbpChUgZpv .dashed-line{stroke-dasharray:3;}#mermaid-svg-ZtOLirsbpChUgZpv #compositionStart,#mermaid-svg-ZtOLirsbpChUgZpv .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #compositionEnd,#mermaid-svg-ZtOLirsbpChUgZpv .composition{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #dependencyStart,#mermaid-svg-ZtOLirsbpChUgZpv .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #dependencyStart,#mermaid-svg-ZtOLirsbpChUgZpv .dependency{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #extensionStart,#mermaid-svg-ZtOLirsbpChUgZpv .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #extensionEnd,#mermaid-svg-ZtOLirsbpChUgZpv .extension{fill:#333333!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #aggregationStart,#mermaid-svg-ZtOLirsbpChUgZpv .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv #aggregationEnd,#mermaid-svg-ZtOLirsbpChUgZpv .aggregation{fill:#ECECFF!important;stroke:#333333!important;stroke-width:1;}#mermaid-svg-ZtOLirsbpChUgZpv .edgeTerminals{font-size:11px;}#mermaid-svg-ZtOLirsbpChUgZpv :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 继承 实现 实现 聚合 聚合 继承 继承 继承 实现 SparkContext «interface» SparkListenerEvent «interface» SparkListenerInterface «interface» SparkListenerBus «interface» ListenerBus LiveListenerBus AsyncEventQueue AppStatusListener ExecutorAllocationListener «Abstract» SparkListener SparkListener相关事件 EventLoggingListener 主体逻辑 启动应用的时候在SparkConext中对LiveListenerBus进行实例化除了内部的监听器还将注册在 spark.extraListeners配置项中指定的监听器然后启动监听器总线。 在LiveListenerBus中使用AsyncEventQueue作为核心实现将事件异步的分发给已经注册的SparkListener监听器们。其中AsyncEventQueue有4类 LiveListenerBus将AsyncEventQueue分为4类不同的事件分发给各自独立的线程进行处理防止在监听器和事件较多的时候造成积压问题。 eventLog日志事件队列executorManagement执行器管理队列appStatus应用程序状态队列shared非内部监听器共享的队列 在AsyncEventQueue内部采用LinkedBlockingQueue来存储事件并启动一个常住线程dispatchThread进行事件的转发。 #mermaid-svg-TvSoPL0HfiocJaXK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .error-icon{fill:#552222;}#mermaid-svg-TvSoPL0HfiocJaXK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-TvSoPL0HfiocJaXK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-TvSoPL0HfiocJaXK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK .marker.cross{stroke:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-TvSoPL0HfiocJaXK .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster-label text{fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster-label span{color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .label text,#mermaid-svg-TvSoPL0HfiocJaXK span{fill:#333;color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .node rect,#mermaid-svg-TvSoPL0HfiocJaXK .node circle,#mermaid-svg-TvSoPL0HfiocJaXK .node ellipse,#mermaid-svg-TvSoPL0HfiocJaXK .node polygon,#mermaid-svg-TvSoPL0HfiocJaXK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-TvSoPL0HfiocJaXK .node .label{text-align:center;}#mermaid-svg-TvSoPL0HfiocJaXK .node.clickable{cursor:pointer;}#mermaid-svg-TvSoPL0HfiocJaXK .arrowheadPath{fill:#333333;}#mermaid-svg-TvSoPL0HfiocJaXK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-TvSoPL0HfiocJaXK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-TvSoPL0HfiocJaXK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-TvSoPL0HfiocJaXK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster text{fill:#333;}#mermaid-svg-TvSoPL0HfiocJaXK .cluster span{color:#333;}#mermaid-svg-TvSoPL0HfiocJaXK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-TvSoPL0HfiocJaXK :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} LiveListenerBus AsyncEventQueue-eventLog AsyncEventQueue-executorManagement AsyncEventQueue-appStatus AsyncEventQueue-shared addToQueue addToQueue addToQueue addToQueue start stop eventQueue event4-1 event4-2 listeners listener4类 listener8类 dispatchThread eventQueue event3-1 event3-2 listeners listener3类 listener7类 dispatchThread eventQueue event2-1 event2-2 listeners listener2类 listener6类 dispatchThread eventQueue event1-1 event1-2 listeners listener1类 listener5类 dispatchThread events发生源1 listener1 events发生源2 listener2 events发生源3 listener3 events发生源4 listener4 代码详解 org.apache.spark.util.ListenerBus Traits类 scala中的Traits类类似Java中的接口类。与接口相同的部分是可以定义抽象的方法和成员不用的部分是可以包含具体的方法可以成员。 package org.apache.spark.utilimport java.util.concurrent.CopyOnWriteArrayListimport scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatalimport com.codahale.metrics.Timerimport org.apache.spark.internal.Logging/*** 事件总线的基类。用来转发事件到对应的事件监听器*/ // [ L:AnyRef]指的是泛型:符号是泛型的上限。private[spark]代表作用域只对spark目录下可见 private[spark] trait ListenerBus[L : AnyRef, E] extends Logging {// (L, Option[Timer])采用的元组式集合private[this] val listenersPlusTimers new CopyOnWriteArrayList[(L, Option[Timer])]// Marked private[spark] for access in tests.private[spark] def listeners listenersPlusTimers.asScala.map(_._1).asJavaprotected def getTimer(listener: L): Option[Timer] None/*** 添加监听器来监听事件。 该方法是线程安全的可以在任何线程中调用。*/final def addListener(listener: L): Unit {listenersPlusTimers.add((listener, getTimer(listener)))}/*** 移除监听器它将不会接收任何事件。 该方法是线程安全的可以在任何线程中调用。*/final def removeListener(listener: L): Unit {listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer listenersPlusTimers.remove(listenerAndTimer)}}/*** 如果删除侦听器时需要进行任何额外的清理则可以由子类覆盖它。 特别是AsyncEventQueue可以清理LiveListenerBus中的队列。*/def removeListenerOnError(listener: L): Unit {removeListener(listener)}/*** 将事件转发给所有注册的侦听器。 postToAll 调用者应该保证在同一线程中为所有事件调用 postToAll。*/def postToAll(event: E): Unit {val iter listenersPlusTimers.iteratorwhile (iter.hasNext) {val listenerAndMaybeTimer iter.next()val listener listenerAndMaybeTimer._1val maybeTimer listenerAndMaybeTimer._2val maybeTimerContext if (maybeTimer.isDefined) {maybeTimer.get.time()} else {null}try {doPostEvent(listener, event)if (Thread.interrupted()) {throw new InterruptedException()}} catch {case ie: InterruptedException logError(sInterrupted while posting to ${Utils.getFormattedClassName(listener)}. sRemoving that listener., ie)removeListenerOnError(listener)case NonFatal(e) logError(sListener ${Utils.getFormattedClassName(listener)} threw an exception, e)} finally {if (maybeTimerContext ! null) {maybeTimerContext.stop()}}}}/*** 将事件发布到指定的侦听器。 保证所有侦听器在同一线程中调用“onPostEvent”。*/protected def doPostEvent(listener: L, event: E): Unitprivate[spark] def findListenersByClass[T : L : ClassTag](): Seq[T] {val c implicitly[ClassTag[T]].runtimeClasslisteners.asScala.filter(_.getClass c).map(_.asInstanceOf[T]).toSeq}}org.apache.spark.util.ListenerBus.SparkListenerBus package org.apache.spark.schedulerimport org.apache.spark.util.ListenerBus/*** SparkListenerEvent事件总线继承ListenerBus类将SparkListenerEvent事件转发到SparkListenerInterface中。* SparkListenerInterface是一个trait接口类里面定义了一些关于spark应用运行周期中的一些事件监听器。* SparkListenerEvent是定义了一个事件的通用接口类其他关于Spark应用运行周期过程中的事件均以 case class实现这个接口*/ private[spark] trait SparkListenerBusextends ListenerBus[SparkListenerInterface, SparkListenerEvent] {// 监听器处理对不同的事件采用不用的处理protected override def doPostEvent(listener: SparkListenerInterface,event: SparkListenerEvent): Unit {event match {case stageSubmitted: SparkListenerStageSubmitted listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate listener.onEnvironmentUpdate(environmentUpdate)case blockManagerAdded: SparkListenerBlockManagerAdded listener.onBlockManagerAdded(blockManagerAdded)case blockManagerRemoved: SparkListenerBlockManagerRemoved listener.onBlockManagerRemoved(blockManagerRemoved)case unpersistRDD: SparkListenerUnpersistRDD listener.onUnpersistRDD(unpersistRDD)case applicationStart: SparkListenerApplicationStart listener.onApplicationStart(applicationStart)case applicationEnd: SparkListenerApplicationEnd listener.onApplicationEnd(applicationEnd)case metricsUpdate: SparkListenerExecutorMetricsUpdate listener.onExecutorMetricsUpdate(metricsUpdate)case executorAdded: SparkListenerExecutorAdded listener.onExecutorAdded(executorAdded)case executorRemoved: SparkListenerExecutorRemoved listener.onExecutorRemoved(executorRemoved)case executorBlacklisted: SparkListenerExecutorBlacklisted listener.onExecutorBlacklisted(executorBlacklisted)case executorUnblacklisted: SparkListenerExecutorUnblacklisted listener.onExecutorUnblacklisted(executorUnblacklisted)case nodeBlacklisted: SparkListenerNodeBlacklisted listener.onNodeBlacklisted(nodeBlacklisted)case nodeUnblacklisted: SparkListenerNodeUnblacklisted listener.onNodeUnblacklisted(nodeUnblacklisted)case blockUpdated: SparkListenerBlockUpdated listener.onBlockUpdated(blockUpdated)case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)case _ listener.onOtherEvent(event)}}}SparkListener实现了接口SparkListenerInterface是它的默认实现类。主要对所有的事件回调做了无操作实现。 事件的存储与转发队列 org.apache.spark.scheduler.AsyncEventQueue package org.apache.spark.schedulerimport java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import com.codahale.metrics.{Gauge, Timer}import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils/*** 事件的异步队列。 发布到此队列的所有事件都将传递到单独线程中的子侦听器。** 仅当调用 start() 方法时才会开始传递事件。 当不需要传递更多事件时应该调用“stop()”方法。*/ private class AsyncEventQueue(val name: String,conf: SparkConf,metrics: LiveListenerBusMetrics,bus: LiveListenerBus)extends SparkListenerBuswith Logging {import AsyncEventQueue._// 维护了队列前文所述的继承自SparkListenerEvent的样例类事件默认长度10000。private val eventQueue new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))// 代表未处理的事件个数从eventQueue弹出的事件不保证处理结束了所以采用一个单独的变量对事件进行计数private val eventCount new AtomicLong()/**丢弃事件的计数器。 */private val droppedEventsCounter new AtomicLong(0L)/** 上次记录“droppedEventsCounter”的时间以毫秒为单位。 */volatile private var lastReportTimestamp 0Lprivate val logDroppedEvent new AtomicBoolean(false)private var sc: SparkContext nullprivate val started new AtomicBoolean(false)private val stopped new AtomicBoolean(false)private val droppedEvents metrics.metricRegistry.counter(squeue.$name.numDroppedEvents)private val processingTime metrics.metricRegistry.timer(squeue.$name.listenerProcessingTime)// 首先删除队列大小计量器以防它是由从侦听器总线中删除的该队列的先前版本创建的。metrics.metricRegistry.remove(squeue.$name.size)metrics.metricRegistry.register(squeue.$name.size, new Gauge[Int] {override def getValue: Int eventQueue.size()})// 事件转发的常驻线程不停的调用dispatch()进行事件转发private val dispatchThread new Thread(sspark-listener-group-$name) {setDaemon(true)override def run(): Unit Utils.tryOrStopSparkContext(sc) {dispatch()}}private def dispatch(): Unit LiveListenerBus.withinListenerThread.withValue(true) {var next: SparkListenerEvent eventQueue.take()while (next ! POISON_PILL) {val ctx processingTime.time()try {// 通过事件总线将事件转发到所有的注册的监听器中。super.postToAll(next)} finally {ctx.stop()}eventCount.decrementAndGet()next eventQueue.take()}eventCount.decrementAndGet()}override protected def getTimer(listener: SparkListenerInterface): Option[Timer] {metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))}/*** 启动一个dispatchThread线程将事件分派给监听器。** param sc Used to stop the SparkContext in case the async dispatcher fails.*/private[scheduler] def start(sc: SparkContext): Unit {if (started.compareAndSet(false, true)) {this.sc scdispatchThread.start()} else {throw new IllegalStateException(s$name already started!)}}/*** 停止监听器总线。 它将等待直到处理完排队的事件但新事件将被丢弃。* 插入POISON_PILLdispatchThread线程读取到POISON_PIL时就会停止事件的分发*/private[scheduler] def stop(): Unit {if (!started.get()) {throw new IllegalStateException(sAttempted to stop $name that has not yet started!)}if (stopped.compareAndSet(false, true)) {eventCount.incrementAndGet()eventQueue.put(POISON_PILL)}if (Thread.currentThread() ! dispatchThread) {dispatchThread.join()}}// 向队列中添加事件如果队列满了丢弃当前事件并记录日志。这是个生产者消费者模型当队列满时生产者丢弃事件但队列为空时消费者等待生产者。def post(event: SparkListenerEvent): Unit {if (stopped.get()) {return}eventCount.incrementAndGet()if (eventQueue.offer(event)) {return}// 向eventQueue添加事件失败后的逻辑eventCount.decrementAndGet()droppedEvents.inc()droppedEventsCounter.incrementAndGet()if (logDroppedEvent.compareAndSet(false, true)) {logError(sDropping event from queue $name. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.)}logTrace(sDropping event $event)val droppedCount droppedEventsCounter.getif (droppedCount 0) {// 为了控制日志的输出频率。采用1分钟输出一次。if (System.currentTimeMillis() - lastReportTimestamp 60 * 1000) {if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {val prevLastReportTimestamp lastReportTimestamplastReportTimestamp System.currentTimeMillis()val previous new java.util.Date(prevLastReportTimestamp)logWarning(sDropped $droppedCount events from $name since $previous.)}}}}/*** For testing only. Wait until there are no more events in the queue.*/def waitUntilEmpty(deadline: Long): Boolean {while (eventCount.get() ! 0) {if (System.currentTimeMillis deadline) {return false}Thread.sleep(10)}true}override def removeListenerOnError(listener: SparkListenerInterface): Unit {bus.removeListener(listener)}}private object AsyncEventQueue {val POISON_PILL new SparkListenerEvent() { }}spark运行事件总线 org.apache.spark.scheduler.LiveListenerBus package org.apache.spark.schedulerimport java.util.{List JList} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag import scala.util.DynamicVariableimport com.codahale.metrics.{Counter, MetricRegistry, Timer}import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source/*** SparkListenerEvent事件管理器* 将 SparkListenerEvents 异步传递给已注册的 SparkListener。** 在调用start()之前所有发布的事件都只会被缓冲。 只有在此侦听器总线启动后事件才会实际传播到所有连接的侦听器。 当调用 stop() 时该监听器总线将停止停止后它将丢弃更多事件。*/ private[spark] class LiveListenerBus(conf: SparkConf) {import LiveListenerBus._private var sparkContext: SparkContext _private[spark] val metrics new LiveListenerBusMetrics(conf)// 表示是否调用了start()方法总线已启动private val started new AtomicBoolean(false)// 表示是否调用了stop()方法总线已启动private val stopped new AtomicBoolean(false)/** 事件放弃计数器 */private val droppedEventsCounter new AtomicLong(0L)/** 上次记录“droppedEventsCounter”的时间以毫秒为单位。 */volatile private var lastReportTimestamp 0Lprivate val queues new CopyOnWriteArrayList[AsyncEventQueue]()// Visible for testing.volatile private[scheduler] var queuedEvents new mutable.ListBuffer[SparkListenerEvent]()/**将侦听器添加到所有非内部侦听器共享的队列中。 */def addToSharedQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, SHARED_QUEUE)}/** 将监听器添加到执行器管理队列中。 */def addToManagementQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)}/** 将侦听器添加到应用程序状态队列。*/def addToStatusQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, APP_STATUS_QUEUE)}/** 将监听器添加到事件日志队列. */def addToEventLogQueue(listener: SparkListenerInterface): Unit {addToQueue(listener, EVENT_LOG_QUEUE)}/*** 将侦听器添加到特定队列并根据需要创建新队列。 * 队列彼此独立每个队列使用单独的线程来传递事件允许较慢的侦听器在一定程度上与其他侦听器隔离。*/private[spark] def addToQueue(listener: SparkListenerInterface,queue: String): Unit synchronized {if (stopped.get()) {throw new IllegalStateException(LiveListenerBus is stopped.)}// 先寻找队列是否存在如果存在就注册不存在就创建新队列并注册queues.asScala.find(_.name queue) match {case Some(queue) queue.addListener(listener)case None val newQueue new AsyncEventQueue(queue, conf, metrics, this)newQueue.addListener(listener)if (started.get()) {newQueue.start(sparkContext)}queues.add(newQueue)}}def removeListener(listener: SparkListenerInterface): Unit synchronized {// 从添加到的所有队列中删除侦听器并停止已变空的队列。queues.asScala.filter { queue queue.removeListener(listener)queue.listeners.isEmpty()}.foreach { toRemove if (started.get() !stopped.get()) {toRemove.stop()}queues.remove(toRemove)}}/** 将事件转发到所有的队列中 */def post(event: SparkListenerEvent): Unit {if (stopped.get()) {return}metrics.numEventsPosted.inc()// 如果事件缓冲区为空则意味着总线已启动我们可以避免同步并将事件直接发布到队列中。 这应该是事件总线生命周期中最常见的情况。if (queuedEvents null) {postToQueues(event)return}// 否则需要同步检查总线是否启动以确保调用 start() 的线程拾取新事件。synchronized {if (!started.get()) {queuedEvents eventreturn}}// 如果进行上述检查时总线已经启动则直接发送到队列。postToQueues(event)}// 遍历所有队列进行事件分发private def postToQueues(event: SparkListenerEvent): Unit {val it queues.iterator()while (it.hasNext()) {it.next().post(event)}}/*** 启动每个队列并发送queuedEvents中缓存的事件。每个队列就开始消费之前post的事件并调用postToAll()方法将事件发送给监视器。** 这首先发送在此侦听器总线启动之前发布的所有缓冲事件然后在侦听器总线仍在运行时异步侦听任何其他事件。* 这应该只被调用一次。** param sc Used to stop the SparkContext in case the listener thread dies.*/def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit synchronized {if (!started.compareAndSet(false, true)) {throw new IllegalStateException(LiveListenerBus already started.)}this.sparkContext scqueues.asScala.foreach { q q.start(sc)queuedEvents.foreach(q.post)}queuedEvents nullmetricsSystem.registerSource(metrics)}/*** Exposed for testing.*/throws(classOf[TimeoutException])def waitUntilEmpty(timeoutMillis: Long): Unit {val deadline System.currentTimeMillis timeoutMillisqueues.asScala.foreach { queue if (!queue.waitUntilEmpty(deadline)) {throw new TimeoutException(sThe event queue is not empty after $timeoutMillis ms.)}}}/*** 停止监听器总线。 它将等待直到处理完排队的事件但在停止后删除新事件。*/def stop(): Unit {if (!started.get()) {throw new IllegalStateException(sAttempted to stop bus that has not yet started!)}if (!stopped.compareAndSet(false, true)) {return}synchronized {queues.asScala.foreach(_.stop())queues.clear()}}// For testing only.private[spark] def findListenersByClass[T : SparkListenerInterface : ClassTag](): Seq[T] {queues.asScala.flatMap { queue queue.findListenersByClass[T]() }}// For testing only.private[spark] def listeners: JList[SparkListenerInterface] {queues.asScala.flatMap(_.listeners.asScala).asJava}// For testing only.private[scheduler] def activeQueues(): Set[String] {queues.asScala.map(_.name).toSet}}private[spark] object LiveListenerBus {// Allows for Context to check whether stop() call is made within listener threadval withinListenerThread: DynamicVariable[Boolean] new DynamicVariable[Boolean](false)private[scheduler] val SHARED_QUEUE sharedprivate[scheduler] val APP_STATUS_QUEUE appStatusprivate[scheduler] val EXECUTOR_MANAGEMENT_QUEUE executorManagementprivate[scheduler] val EVENT_LOG_QUEUE eventLog }private[spark] class LiveListenerBusMetrics(conf: SparkConf)extends Source with Logging {override val sourceName: String LiveListenerBusoverride val metricRegistry: MetricRegistry new MetricRegistryval numEventsPosted: Counter metricRegistry.counter(MetricRegistry.name(numEventsPosted))// Guarded by synchronization.private val perListenerClassTimers mutable.Map[String, Timer]()def getTimerForListenerClass(cls: Class[_ : SparkListenerInterface]): Option[Timer] {synchronized {val className cls.getNameval maxTimed conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)perListenerClassTimers.get(className).orElse {if (perListenerClassTimers.size maxTimed) {logError(sNot measuring processing time for listener class $className because a smaximum of $maxTimed listener classes are already timed.)None} else {perListenerClassTimers(className) metricRegistry.timer(MetricRegistry.name(listenerProcessingTime, className))perListenerClassTimers.get(className)}}}}}Spark任务启动时会在SparkContext中启动spark运行的事件总线LiveListenerBus private def setupAndStartListenerBus(): Unit {try {conf.get(EXTRA_LISTENERS).foreach { classNames val listeners Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)listeners.foreach { listener listenerBus.addToSharedQueue(listener)logInfo(sRegistered listener ${listener.getClass().getName()})}}} catch {case e: Exception try {stop()} finally {throw new SparkException(sException when registering SparkListener, e)}}// 启动应用的运行事件总线listenerBus.start(this, _env.metricsSystem)_listenerBusStarted true}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/89148.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一个公司优化需要做多少个网站前端开发和后端开发哪个好些

传媒如春雨,润物细无声,大家好,我是51媒体网胡老师。 企业活动发布会邀请媒体报道具有多种好处与优势,这些都有助于提升企业的知名度、形象和影响力。以下是一些主要的好处与优势: 提升品牌知名度:媒体报道…

工会网站建设可以wordpress for sae 4.3

​在使用jmeter进行接口测试时,我们难免会遇到需要从上下文中获取测试数据的情况,这个时候就需要引入变量了。 定义变量 添加->配置元件->用户自定义的变量 添加->配置元件->CSV 数据文件设置 变量的调用方式:${变量名} 变量的…

齐齐哈尔住房和城乡建设局网站课程网站如何建设方案

LED流水灯 循环左移右移函数 crol(a,b):循环左移函数,a是左移的值,b是左移的位数。包含在instrins.h库函数里面。 cror(a,b):循环右移函数,a是右移的值,b是右移的位数。包含在instrins.h库函数里面。 实验代码 #include "…

网站建设投标书报价表湘潭网站建设厦门网站制作

IP地址定位能够精确到的位置级别取决于多种因素,包括IP地址的分配方式、数据库的质量和更新频率、用户的移动性等。一般而言,IP地址定位可以精确到市级,甚至可以达到街道级别 https://www.ip66.net/?utm-sourceLJ&utm-keyword?1146 但需…

网站建设与维护经营范围惠州网站策划建设

【工业智能】音频信号相关场景 DcaseDcase introduction:dcase2024有10个主题的任务: ASD硬件设备产品商 方法制造业应用场景 zenodo音频事件检测 与计算机视觉CV相对应,计算机听觉computer audition,简称CA。 Dcase 这里推荐一个…

免费域名注册哪个网站好网站开发根目录建在哪

一、关于面向对象 1.1简介 Java 是一种面向对象编程语言,其核心思想是面向对象编程(Object-Oriented Programming,OOP)。 面向对象编程是一种程序设计范式,它将数据与操作数据的方法(函数)捆…

韩国做游戏的电影 迅雷下载网站企业查询网站

HTTP传输协议缺点 之前几篇文章中详细讲解了TCP/IP协议栈中的几个协议,其中个就有对HTTP做了一个比较详细的讲解。HTTP是基于TCP进行传输的,其中传输的内容都是明文报文数据,如果我是一个黑客,我会想办法获取这个HTTP消息体&…

资源网站不好找了wordpress好看的友情链接页面

2016年.NET Core首个正式版本问世,如今已发布到了.NET Core3.1,再有2个月.NET5也将如约而至,跨平台开发已经快5年。微软 .NET 程序管理总监 Scott 表示,.NET 5 是 .NET Framework 和 .NET Core 的未来,最终将成为一个统…

手机网页版微信登录入口深圳网站seo哪家快

通讯网关 api网关这些年来,API网关正在经历一些身份危机 。 它们是否是集中的共享资源,从而促进了API对外部实体的公开和治理? 它们是集群入口哨兵,可以严格控制哪些用户流量进入或离开集群吗? 还是他们根据自己拥有…

山西太原建站怎么做网站建设蓝色工匠

当然是为实现功能而设计, 这句话没错. 但是还不够, 针对具体的应用场合, 应该采取不同的设计策略. 例如GUI Client程序, 必须重点注意用户的体验, 为提高易用性而设计 而一般后台应用程序, 就必须在高性能和可靠性方面加强设计 只创建一次的对象, 我们不必在乎其构造时间, 而频…

网站运营一个月多少钱wordpress po

下载的Matlab R2016b软件安装包(文末附有下载地址)目录如下所示: 安装过程: 1. 安装主程序R2016b_win64_dvd1.iso和R2016b_win64_dvd2.iso 由于目前大多数及其都是Win8或10系统,所以选中R2016b_win64_dvd1.iso,右键→Windows资源管理器打开。Win7系统可以安装好压软件之后…

做电商网站一般要多少钱diy图片制作

详情点查看公众号:技术科研吧 链接:如何用ChatGPT绘图? 一:AI领域最新技术 1.OpenAI新模型-GPT-5 2.谷歌新模型-Gemini Ultra 3.Meta新模型-LLama3 4.科大讯飞-星火认知 5.百度-文心一言 6.MoonshotAI-Kimi 7.智谱AI-GLM-…

做网站在经营范围内属于什么中国logo设计公司排名

一、功能 for 循环提供了python中最强大的循环结构(for循环是一种迭代循环机制,而while循环是条件循环,迭代即重复相同的逻辑操作,每次操作都是基于上一次的结果,而进行的) Python for循环可以遍历任何序列…

网站推广优化排名做网站用什么域名好

目录 1、回显模式 2、成果显示 3、知识点 1)FormLayout布局添加addRow方法 2)在输入框显示灰色提示字体,输入内容时消失setPlaceholderText 3)设置回显模式setEchoMode 4、完整代码 1、回显模式 QLineEdit控件的主要功能是输…

申请注册网站域名.商城重庆市建设信息网站

文章目录 前言简介第一步:引入依赖第二步:编写文件解析处理类第三步:Word解析类第四步:PDF解析类第五步:Txt解析类总结 前言 请各大网友尊重本人原创知识分享,谨记本人博客:南国以南i、 提示&a…

树状结构的网站百度关键词查询工具

MySQL数据库设计篇 概述 做服务端开发离不开数据库设计,虽然说服务端技术一直在革新,但是MySQL一直都是我们首选使用的关系型数据库。服务端开发一直以来都是采用数据驱动研发的思想,可见数据库设计是非常重要的,数据库设计的好坏…

广州外贸网站公司安阳网站开发

目录: 一. 异常概念与体系结构 二. 异常的处理 三. 自定义异常类 一. 异常概念与体系结构: 1 异常的概念:在 Java 中,将程序执行过程中发生的 不正常行为 称为异常, 如:算数异常: ArithmeticException System.out.pri…

如何创建个人网站模板杭州营销网站建设平台

1. 自动提交最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是5s。消费者每次…

网站建设图标合集网站如何做团购

题目 28. 实现 strStr() 实现 strStr() 函数。 给定一个 haystack 字符串和一个 needle 字符串,在 haystack 字符串中找出 needle 字符串出现的第一个位置 (从0开始)。如果不存在,则返回 -1。 示例 1: 输入: haystack “hello”, needle “ll” 输…

企业展示网站 价钱网站建设免费软件

目录 修改约束 创建数据库 添加约束 删除约束 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 修改约束 如果说表结构的修改还在可以容忍的范畴之内,那么约束的修改是绝对 100% 禁止的 所有的约束一定要在…