付费的网站推广该怎么做合肥网络公司 网站建设

news/2025/9/24 17:49:17/文章来源:
付费的网站推广该怎么做,合肥网络公司 网站建设,wordpress安全插件下载,wordpress动态链接301摘要 spark的调度一直是我想搞清楚的东西#xff0c;以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的#xff0c;还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中…摘要 spark的调度一直是我想搞清楚的东西以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中的奥秘。总结起来以便以后继续完善。spark的调度分为两级调度DAGSchedule和TaskSchedule。DAGSchedule是根据job来生成相互依赖的stages然后把stages以TaskSet形式传递给TaskSchedule来进行任务的分发过程里面的细节会慢慢的讲解出来的比较长。 本文目录 1、spark的RDD逻辑执行链2、spark的job的划分、stage的划分3、spark的DAGScheduler的调度4、spark的TaskSchedule的调度5、executor如何执行task以及我们定义的函数 spark的RDD的逻辑执行链 都说spark进行延迟执行通过RDD的DAG来生成相应的Stage等RDD的DAG的形成过程是通过依赖来完成的每一个RDD通过转换算子的时候都会生成一个和多个子RDD在通过转换算子的时候在创建一个新的RDD的时候也会创建他们之间的依赖关系。因此他们是通过Dependencies连接起来的RDD的依赖不是我们的重点如果想了解RDD的依赖可以自行googleRDD的依赖分为1:1的OneToOneDependencym:1的RangeDependency还有m:n的ShuffleDependencies其中OneToOneDependency和RangeDependency又被称为NarrowDependency这里的1:1,m:1,m:n的粒度是对于RDD的分区而言的。 依赖中最根本的是保留了父RDD其rdd的方法就是返回父RDD的方法。这样其就形成了一个链表形式的结构通过最后面的RDD根据依赖可以向前回溯到所有的父类RDD。我们以map为例来看一下依赖是如何产生的。 通过map其实其实创建了一个MapPartitonsRDD的RDD然后我们看一下MapPartitonsRDD的主构造函数其又对RDD进行了赋值其中父RDD就是上面的this对象指定的RDD我们再看一下RDD这个类的构造函数其又调用了RDD的主构造函数其实依赖都是在RDD的构造函数中形成的。通过上面的依赖转换就形成了RDD额DAG图生成了一个RDD的DAG图spark的job的划分、stage的划分spark的Application划分job其实挺简单的一个Application划分为几个job我们就要看这个Application中有多少个Action算子一个Action算子对应一个job这个可以通过源码来看出来转换算子是形成一个或者多个RDD而Action算子是触发job的提交。比如上面的map转换算子就是这样的而Action算子是这样的通过runJob方法提交作业。stage的划分是根据是否进行shuflle过程来决定的这个后面会细说。 spark的DAGScheduler的调度 当我们通过客户端向spark集群提交作业时如果利用的资源管理器是yarn那么客户端向spark提交申请运行driver进程的机器driver其实在spark中是没有具体的类的driver机器主要是用来运行用户编写的代码的地方完成DAGScheduler和TaskSchedule追踪task运行的状态。记住用户编写的主函数是在driver中运行的但是RDD转换和执行是在不同的机器上完成。其实driver主要负责作业的调度和分发。Action算子到stage的划分和DAGScheduler的完成过程。当我们在driver进程中运行用户定义的main函数的时候首先会创建SparkContext对象这个是我们与spark集群进行交互的入口它会初始化很多运行需要的环境最主要的是初始化了DAGScheduler和TaskSchedule。我们以这样的的一个RDD的逻辑执行图来分析整个DAGScheduler的过程。因为DAGScheduler发生在driver进程中我们就冲Driver进程运行用户定义的main函数开始。在上图中RDD9是最后一个RDD并且其调用了Action算子就会触发作业的提交其会调用SparkContext的runjob函数其经过一系列的runJob的封装会调用DAGScheduler的runJob 在SparkContext中存在着runJob方法 ---------------------------------------------- def runJob[T, U: ClassTag](rdd: RDD[T], // rdd为上面提到的RDD逻辑执行图中的RDD9func: (TaskContext, Iterator[T]) U,这个方法也是RDD9调用Action算子传入的函数partitions: Seq[Int],resultHandler: (Int, U) Unit): Unit {if (stopped.get()) {throw new IllegalStateException(SparkContext has been shutdown)}val callSite getCallSiteval cleanedFunc clean(func)logInfo(Starting job: callSite.shortForm)if (conf.getBoolean(spark.logLineage, false)) {logInfo(RDDs recursive dependencies:\n rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint()} ---------------------------------------------- DAGScheduler的runJob ---------------------------------------------- def runJob[T, U](rdd: RDD[T], //RDD9func: (TaskContext, Iterator[T]) U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) Unit,properties: Properties): Unit {val start System.nanoTime//在这里会生成一个job的守护进程waiter用来等待作业提交执行是否完成其又调用了submitJob其以下的代//码都是用来处运行结果的一些log日志信息val waiter submitJob(rdd, func, partitions, callSite, resultHandler, properties)ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)waiter.completionFuture.value.get match {case scala.util.Success(_) logInfo(Job %d finished: %s, took %f s.format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))case scala.util.Failure(exception) logInfo(Job %d failed: %s, took %f s.format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.val callerStackTrace Thread.currentThread().getStackTrace.tailexception.setStackTrace(exception.getStackTrace callerStackTrace)throw exception}} ---------------------------------------------- submitJob的源代码 ---------------------------------------------- def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) Unit,properties: Properties): JobWaiter[U] {// 检查RDD的分区是否合法val maxPartitions rdd.partitions.lengthpartitions.find(p p maxPartitions || p 0).foreach { p throw new IllegalArgumentException(Attempting to access a non-existent partition: p . Total number of partitions: maxPartitions)} val jobId nextJobId.getAndIncrement()if (partitions.size 0) {// Return immediately if the job is running 0 tasksreturn new JobWaiter[U](this, jobId, 0, resultHandler)} assert(partitions.size 0)//这一块是把我们的job继续进行封装到JobSubmitted然后放入到一个进程中池里spark会启动一个线程来处理我//们提交的作业val func2 func.asInstanceOf[(TaskContext, Iterator[]) ]val waiter new JobWaiter(this, jobId, partitions.size, resultHandler)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))waiter} ---------------------------------------------- 在DAGScheduler类中有一个DAGSchedulerEventProcessLoop的类用来接收处理DAGScheduler的消息事件JobSubmitted对象因此会执行第一个操作handleJobSubmitted在这里我们要说一下Stage的类型在spark中有两种类型的stage一种是ShuffleMapStage和ResultStage最后一个RDD对应的Stage是ResultStage遇到Shuffle过程的RDD被称为ShuffleMapStage。 ---------------------------------------------- private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[],//对应RDD9func: (TaskContext, Iterator[]) _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {var finalStage: ResultStage nulltry {// 先创建ResultStage。finalStage createResultStage(finalRDD, func, partitions, jobId, callSite)} catch {case e: Exception logWarning(Creating new stage failed due to exception - job: jobId, e)listener.jobFailed(e)return} val job new ActiveJob(jobId, finalStage, callSite, listener, properties)clearCacheLocs()logInfo(Got job %s (%s) with %d output partitions.format(job.jobId, callSite.shortForm, partitions.length))logInfo(Final stage: finalStage ( finalStage.name ))logInfo(Parents of final stage: finalStage.parents)logInfo(Missing parents: getMissingParentStages(finalStage)) val jobSubmissionTime clock.getTimeMillis()jobIdToActiveJob(jobId) jobactiveJobs jobfinalStage.setActiveJob(job)val stageIds jobIdToStageIds(jobId).toArrayval stageInfos stageIds.flatMap(id stageIdToStage.get(id).map(_.latestInfo))listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))submitStage(finalStage)} ---------------------------------------------- 上面的createResultStage其实就是RDD转换为Stage的过程方法如下 ---------------------------------------------- /*创建ResultStage的时候它会调用相关函数*/private def createResultStage(rdd: RDD[], //对应上图的RDD9func: (TaskContext, Iterator[]) _,partitions: Array[Int],jobId: Int,callSite: CallSite): ResultStage {val parents getOrCreateParentStages(rdd, jobId)val id nextStageId.getAndIncrement()val stage new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)stageIdToStage(id) stageupdateJobIdStageIdMaps(jobId, stage)stage} /** 形成ResultStage依赖的父Stage*/private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] {getShuffleDependencies(rdd).map { shuffleDep getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList}/**采用的是深度优先遍历找到Action算子的父依赖中的宽依赖这个是最主要的方法要看懂这个方法其实后面的就好理解最好结合这例子上面给出的RDD逻辑依赖图比* 较容易看出来根据上面的RDD逻辑依赖图其返回的ShuffleDependency就是RDD2和RDD1RDD7和RDD6的依赖,如果存在A-B-C,这两个都是shuffle依赖那么对于C其只返回B的shuffle依赖而不会返回A/private[scheduler] def getShuffleDependencies(rdd: RDD[]): HashSet[ShuffleDependency[, , ]] {//用来存放依赖val parents new HashSet[ShuffleDependency[, , ]]//遍历过的RDD放入这个里面val visited new HashSet[RDD[]]//创建一个待遍历RDD的栈结构val waitingForVisit new ArrayStack[RDD[]]//压入finalRDD逻辑图中的RDD9waitingForVisit.push(rdd)//循环遍历这个栈结构while (waitingForVisit.nonEmpty) {val toVisit waitingForVisit.pop()// 如果RDD没有被遍历过执行其中的代码if (!visited(toVisit)) {//然后把其放入已经遍历队列中visited toVisit//得到依赖我们知道依赖中存放的有父RDD的对象toVisit.dependencies.foreach {//如果这个依赖是shuffle依赖则放入返回队列中case shuffleDep: ShuffleDependency[, , ] parents shuffleDepcase dependency //如果不是shuffle依赖把其父RDD压入待访问栈中从而进行循环waitingForVisit.push(dependency.rdd)}}}parents}/创建shuffleMapStage根据上面得到的两个Shuffle对象分别创建了两个shuffleMapStage//def createShuffleMapStage(shuffleDep: ShuffleDependency[, , _], jobId: Int): ShuffleMapStage {//这个RDD其实就是RDD1和RDD6val rdd shuffleDep.rddval numTasks rdd.partitions.lengthval parents getOrCreateParentStages(rdd, jobId) //查看这两个ShuffleMapStage是否存在父Shuffle的Stageval id nextStageId.getAndIncrement()//创建ShuffleMapStage下面是更新一下SparkContext的状态val stage new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)stageIdToStage(id) stageshuffleIdToMapStage(shuffleDep.shuffleId) stageupdateJobIdStageIdMaps(jobId, stage) if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {// Kind of ugly: need to register RDDs with the cache and map output tracker here// since we cant do it in the RDD constructor because # of partitions is unknownlogInfo(Registering RDD rdd.id ( rdd.getCreationSite ))mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)}stage} ---------------------------------------------- 通过上面的源代码分析结合RDD的逻辑执行图我们可以看出这个job拥有三个Stage一个ResultStage两个ShuffleMapStage一个ShuffleMapStage中的RDD是RDD1另一个stage中的RDD是RDD6,从而以上完成了RDD到Stage的切分工作。当切分完成后在handleJobSubmitted这个方法的最后调用提交stage的方法。 submitStage源代码比较简单它会检查我们当前的stage依赖的父stage是否已经执行完成如果没有执行完成会循环提交其父stage等待其父stage执行完成了才提交我们当前的stage进行执行。 ---------------------------------------------- private def submitStage(stage: Stage) {val jobId activeJobForStage(stage)if (jobId.isDefined) {logDebug(submitStage( stage ))if (!waitingStages(stage) !runningStages(stage) !failedStages(stage)) {val missing getMissingParentStages(stage).sortBy(_.id)logDebug(missing: missing)if (missing.isEmpty) {logInfo(Submitting stage ( stage.rdd ), which has no missing parents)submitMissingTasks(stage, jobId.get)} else {for (parent - missing) {submitStage(parent)}waitingStages stage}}} else {abortStage(stage, No active job for stage stage.id, None)}} ---------------------------------------------- 提交task的方法源代码我们按照刚才的三个stage中提交的是前两个stage的过程来看待这个源代码。以包含RDD1的stage为例 ---------------------------------------------- private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug(submitMissingTasks( stage ))// Get our pending tasks and remember them in our pendingTasks entrystage.pendingPartitions.clear() // 计算需要计算的分区数 val partitionsToCompute: Seq[Int] stage.findMissingPartitions()// Use the scheduling pool, job group, description, etc. from an ActiveJob associated // with this Stage val properties jobIdToActiveJob(jobId).propertiesrunningStages stage// 封装stage的一些信息得到stage到分区数的映射关系即一个stage对应多少个分区需要计算 stage match {case s: ShuffleMapStage outputCommitCoordinator.stageStart(stage s.id, maxPartitionId s.numPartitions - 1)case s: ResultStage outputCommitCoordinator.stageStart(stage s.id, maxPartitionId s.rdd.partitions.length - 1) } //得到每个分区对应的具体位置即分区的数据位于集群的哪台机器上。val taskIdToLocations: Map[Int, Seq[TaskLocation]] try {stage match {case s: ShuffleMapStage partitionsToCompute.map { id (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage partitionsToCompute.map { id val p s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, sTask creation failed: $e\n${Utils.exceptionString(e)}, Some(e))runningStages - stagereturn}// 这个把上面stage要计算的分区和每个分区对应的物理位置进行了从新封装放在了latestInfo里面stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) //序列化我们刚才得到的信息以便在driver机器和work机器之间进行传输var taskBinary: Broadcast[Array[Byte]] nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] stage match {case stage: ShuffleMapStage JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))case stage: ResultStage JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))} taskBinary sc.broadcast(taskBinaryBytes) } catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException abortStage(stage, Task not serializable: e.toString, Some(e))runningStages - stage// Abort executionreturncase NonFatal(e) abortStage(stage, sTask serialization failed: $e\n${Utils.exceptionString(e)}, Some(e))runningStages - stagereturn } //封装stage构成taskSet集合ShuffleMapStage对应的task为ShuffleMapTask而ResultStage对应的taskSet为ResultTaskval tasks: Seq[Task[_]] try {stage match {case stage: ShuffleMapStage partitionsToCompute.map { id val locs taskIdToLocations(id)val part stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)} case stage: ResultStage partitionsToCompute.map { id val p: Int stage.partitions(id)val part stage.rdd.partitions(p)val locs taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)} } } catch {case NonFatal(e) abortStage(stage, sTask creation failed: $e\n${Utils.exceptionString(e)}, Some(e))runningStages - stagereturn} //提交task给TaskScheduleif (tasks.size 0) {logInfo(Submitting tasks.size missing tasks from stage ( stage.rdd ))stage.pendingPartitions tasks.map(_.partitionId)logDebug(New pending partitions: stage.pendingPartitions)taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark// the stage as completed here in case there are no tasks to runmarkStageAsFinished(stage, None) val debugString stage match {case stage: ShuffleMapStage sStage ${stage} is actually done; s(available: ${stage.isAvailable}, savailable outputs: ${stage.numAvailableOutputs}, spartitions: ${stage.numPartitions})case stage : ResultStage sStage ${stage} is actually done; (partitions: ${stage.numPartitions}) } logDebug(debugString)submitWaitingChildStages(stage) }} ---------------------------------------------- 到此完成了整个DAGScheduler的调度。 spark的TaskSchedule的调度 spark的Task的调度我们要明白其调度过程其根据不同的资源管理器拥有不同的调度策略因此也拥有不同的调度守护进程这个守护进程管理着集群的资源信息spark提供了一个基本的守护进程的类来完成与driver和executor的交互CoarseGrainedSchedulerBackend它应该运行在集群资源管理器上比如yarn等。他收集了集群work机器的一般资源信息。当我们形成tasks将要进行调度的时候driver进程会与其通信请求资源的分配和调度其会把最优的work节点分配给task来执行其任务。而TaskScheduleImpl实现了task调度的过程采用的调度算法默认的是FIFO的策略也可以采用公平调度策略。 当我们提交task时其会创建一个管理task的类TaskSetManager然后把其加入到任务调度池中。 ---------------------------------------------- override def submitTasks(taskSet: TaskSet) {val tasks taskSet.taskslogInfo(Adding task set taskSet.id with tasks.length tasks)this.synchronized {// 创建taskSetManager以下为更新一下状态val manager createTaskSetManager(taskSet, maxTaskFailures)val stage taskSet.stageIdval stageTaskSets taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) managerval conflictingTaskSet stageTaskSets.exists { case (, ts) ts.taskSet ! taskSet !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(smore than one active taskSet for stage $stage: s ${stageTaskSets.toSeq.map{._2.taskSet.id}.mkString(,)})}//把封装好的taskSet加入到任务调度队列中。schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning(Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources)} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask true }//这个地方就是向资源管理器发出请求请求任务的调度backend.reviveOffers()} /**这个方法是位于CoarseGrainedSchedulerBackend类中driver进程会想集群管理器发送请求资源的请求。/override def reviveOffers() {driverEndpoint.send(ReviveOffers)} ---------------------------------------------- 当其收到这个请求时其会调用这样的方法。 ---------------------------------------------- override def receive: PartialFunction[Any, Unit] {case StatusUpdate(executorId, taskId, state, data) scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) executorInfo.freeCores scheduler.CPUS_PER_TASKmakeOffers(executorId)case None // Ignoring the update since we dont know about the executor.logWarning(sIgnored task status update ($taskId state $state) sfrom unknown executor with ID $executorId)}}//发送的请求满足这个条件case ReviveOffers makeOffers() case KillTask(taskId, executorId, interruptThread) executorDataMap.get(executorId) match {case Some(executorInfo) executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None // Ignoring the task kill since the executor is not registered.logWarning(sAttempted to kill task $taskId for unknown executor $executorId.)}} /**这个方法是搜集集群上现在还在活着的机器的相关信息。并且进行封装成WorkerOffer类 然后其会调用TaskSchedulerImpl中的resourceOffers方法来进行筛选筛选出符合请求资源的机器来执行我们当前的任务/private def makeOffers() {// Filter out executors under killingval activeExecutors executorDataMap.filterKeys(executorIsAlive)val workOffers activeExecutors.map { case (id, executorData) new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}/*得到集群中空闲机器的信息后我们通过此方法来筛选出满足我们这次任务要求的机器然后返回TaskDescription类*这个类封装了task与excutor的相关信息 /def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail false//检查work是否已经存在了把不存在的加入到work调度池中for (o - offers) {if (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) new HashSet[String]()}if (!executorIdToRunningTaskIds.contains(o.executorId)) {hostToExecutors(o.host) o.executorIdexecutorAdded(o.executorId, o.host)executorIdToHost(o.executorId) o.hostexecutorIdToRunningTaskIds(o.executorId) HashSet[Long]()newExecAvail true}for (rack - getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) o.host}}// 打乱work机器的顺序以免每次分配任务时都在同一个机器上进行。避免某一个work计算压力太大。val shuffledOffers Random.shuffle(offers)//对于每一work创建一个与其核数大小相同的数组数组的大小决定了这台work上可以并行执行task的数目.val tasks shuffledOffers.map(o new ArrayBufferTaskDescription)//取出每台机器的cpu核数val availableCpus shuffledOffers.map(o o.cores).toArray//从task任务调度池中按照我们的调度算法取出需要执行的任务val sortedTaskSets rootPool.getSortedTaskSetQueuefor (taskSet - sortedTaskSets) {logDebug(parentName: %s, name: %s, runningTasks: %s.format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// 下面的这个循环是用来标记task根据work的信息来标定数据本地化的程度的。当我们在yarn资源管理器以--driver-mode配置//为client时我们就会在打出来的日志上看出每一台机器上运行task的数据本地化程度。同时还会选择每个task对应的work机器// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet - sortedTaskSets) {var launchedAnyTask falsevar launchedTaskAtCurrentMaxLocality falsefor (currentMaxLocality - taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask | launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}} if (tasks.size 0) {hasLaunchedTask true}//返回taskDescription对象return tasks} /*task选择执行其任务的work其实是在这个函数中实现的从这个可以看出一台work上其实是可以运行多个task主要是看如何*进行算法调度 /private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean {var launchedTask false//循环所有的机器找适合此机器的taskfor (i - 0 until shuffledOffers.size) {val execId shuffledOffers(i).executorIdval host shuffledOffers(i).host//判断其剩余的cpu核数是否满足我们的最低配置满足则为其分配任务否则不为其分配任务。if (availableCpus(i) CPUS_PER_TASK) {try {//这个for中的resourOffer就是来判断其标记任务数据本地化的程度的。task(i)其实是一个数组数组大小和其cpu核心数大小相同。for (task - taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) taskval tid task.taskIdtaskIdToTaskSetManager(tid) taskSettaskIdToExecutorId(tid) execIdexecutorIdToRunningTaskIds(execId).add(tid)availableCpus(i) - CPUS_PER_TASKassert(availableCpus(i) 0)launchedTask true}} catch {case e: TaskNotSerializableException logError(sResource offer failed, task set ${taskSet.name} was not serializable)// Do not offer resources for this task, but dont throw an error to allow other// task sets to be submitted.return launchedTask}}}return launchedTask} ---------------------------------------------- 以上完成了从TaskSet到task和work机器的绑定过程的所有任务。下面就是如何发送task到executor进行执行。在makeOffers()方法中调用了launchTasks方法,这个方法其实就是发送task作业到指定的机器上。只此spark TaskSchedule的调度就此结束。 executor如何执行task以及我们定义的函数 当TaskSchedule完成对task的调度时task需要在work机器上来进行执行。此时work机器就会启动一个Backend的守护进程用来完成与driver和资源管理器的通信。这个Backend就是CoarseGrainedExecutorBackend启动的main主函数为,从main函数中可以看出其主要进行参数的解析然后运行run方法。 ---------------------------------------------- def main(args: Array[String]) {var driverUrl: String nullvar executorId: String nullvar hostname: String nullvar cores: Int 0var appId: String nullvar workerUrl: Option[String] Noneval userClassPath new mutable.ListBuffer[URL]()var argv args.toListwhile (!argv.isEmpty) {argv match {case (--driver-url) :: value :: tail driverUrl valueargv tailcase (--executor-id) :: value :: tail executorId valueargv tailcase (--hostname) :: value :: tail hostname valueargv tailcase (--cores) :: value :: tail cores value.toIntargv tailcase (--app-id) :: value :: tail appId valueargv tailcase (--worker-url) :: value :: tail // Worker url is used in spark standalone mode to enforce fate-sharing with workerworkerUrl Some(value)argv tailcase (--user-class-path) :: value :: tail userClassPath new URL(value)argv tailcase Nil case tail // scalastyle:off printlnSystem.err.println(sUnrecognized options: ${tail.mkString( )})// scalastyle:on printlnprintUsageAndExit()}}if (driverUrl null || executorId null || hostname null || cores 0 ||appId null) {printUsageAndExit()}run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)System.exit(0)}/*可以看出run方法只是进行了一些需要运行task所需要的环境进行配置。并且创建相应的运行环境。 /private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () // Debug codeUtils.checkHost(hostname) // Bootstrap to fetch the drivers Spark properties.val executorConf new SparkConfval port executorConf.getInt(spark.executor.port, 0)val fetcher RpcEnv.create(driverPropsFetcher,hostname,port,executorConf,new SecurityManager(executorConf),clientMode true)val driver fetcher.setupEndpointRefByURI(driverUrl)val cfg driver.askWithRetrySparkAppConfigval props cfg.sparkProperties Seq[(String, String)]((spark.app.id, appId))fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver.val driverConf new SparkConf()for ((key, value) - props) {// this is required for SSL in standalone modeif (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)} else {driverConf.set(key, value)}}if (driverConf.contains(spark.yarn.credentials.file)) {logInfo(Will periodically update credentials from: driverConf.get(spark.yarn.credentials.file))SparkHadoopUtil.get.startCredentialUpdater(driverConf)} val env SparkEnv.createExecutorEnv(driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal false) env.rpcEnv.setupEndpoint(Executor, new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))workerUrl.foreach { url env.rpcEnv.setupEndpoint(WorkerWatcher, new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()SparkHadoopUtil.get.stopCredentialUpdater()}} ---------------------------------------------- 其执行函数的调用过程如下 我们知道当我们完成TaskSchedule的调度时是通过rpc发送了一个消息如下图所示当work机器的Backend启动以后其会与driver进程进行rpc通信当其收到LaunchTask的消息后其会执行下面的代码。我们可以看出此方法存在很多的情况根据接收到的不同的消息执行不同的代码。我们上面执行的是LaunchTask的请求。 ---------------------------------------------- override def receive: PartialFunction[Any, Unit] {case RegisteredExecutor logInfo(Successfully registered with driver)try {executor new Executor(executorId, hostname, env, userClassPath, isLocal false)} catch {case NonFatal(e) exitExecutor(1, Unable to create executor due to e.getMessage, e)} case RegisterExecutorFailed(message) exitExecutor(1, Slave registration failed: message)//提交任务时执行这样的操作。case LaunchTask(data) if (executor null) {exitExecutor(1, Received LaunchTask command but executor was null)} else {//先反序列化val taskDesc ser.deserializeTaskDescriptionlogInfo(Got assigned task taskDesc.taskId)//然后执行launchTask操作。executor.launchTask(this, taskId taskDesc.taskId, attemptNumber taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)} case KillTask(taskId, _, interruptThread) if (executor null) {exitExecutor(1, Received KillTask command but executor was null)} else {executor.killTask(taskId, interruptThread)} case StopExecutor stopping.set(true)logInfo(Driver commanded a shutdown)// Cannot shutdown here because an ack may need to be sent back to the caller. So send// a message to self to actually do the shutdown.self.send(Shutdown) case Shutdown stopping.set(true)new Thread(CoarseGrainedExecutorBackend-stop-executor) {override def run(): Unit {// executor.stop() will call SparkEnv.stop() which waits until RpcEnv stops totally.// However, if executor.stop() runs in some thread of RpcEnv, RpcEnv wont be able to// stop until executor.stop() returns, which becomes a dead-lock (See SPARK-14180).// Therefore, we put this line in a new thread.executor.stop()}}.start()} ---------------------------------------------- Executor的相关源代码,从源码中我们可以看出对于Task其创建了一个TaskRunner的线程并且把其放入到执行队列中进行执行。 ---------------------------------------------- def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit {val tr new TaskRunner(context, taskId taskId, attemptNumber attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr)} ---------------------------------------------- 从下面可以看出其定义的就是一个线程那我们就看一下这个线程的run方法。 ---------------------------------------------- override def run(): Unit {//初始化线程运行需要的一些环境val threadMXBean ManagementFactory.getThreadMXBeanval taskMemoryManager new TaskMemoryManager(env.memoryManager, taskId)val deserializeStartTime System.currentTimeMillis()val deserializeStartCpuTime if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L//得到当前进程的类加载器Thread.currentThread.setContextClassLoader(replClassLoader)val ser env.closureSerializer.newInstance()logInfo(sRunning $taskName (TID $taskId))//更新相关的状态execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)var taskStart: Long 0var taskStartCpu: Long 0startGCTime computeTotalGcTime() try { //反序列化类相关的依赖得到相关的参数val (taskFiles, taskJars, taskProps, taskBytes) Task.deserializeWithDependencies(serializedTask) // Must be set before updateDependencies() is called, in case fetching dependencies// requires access to properties contained within (e.g. for access control).Executor.taskDeserializationProps.set(taskProps) //更新依赖配置updateDependencies(taskFiles, taskJars)task ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)task.localProperties taskPropstask.setTaskMemoryManager(taskMemoryManager) // If this task has been killed before we deserialized it, lets quit now. Otherwise,// continue executing the task.if (killed) {// Throw an exception rather than returning, because returning within a try{} block// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl// exception will be caught by the catch block, leading to an incorrect ExceptionFailure// for the task.throw new TaskKilledException}logDebug(Task taskId s epoch is task.epoch) //追踪缓存数据的位置env.mapOutputTracker.updateEpoch(task.epoch) // Run the actual task and measure its runtime.taskStart System.currentTimeMillis()taskStartCpu if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lvar threwException true //运行任务的run方法来运行task主要就是下面的task.run方法它又会调用runTask方法来真正执行task前面我们提到过job变//为stage有两种ShuffleMapStage和ResultStage那么其对应的也有两个TaskShuffleMapTask和ResultTask不同的task类型执行不同的run方法。val value try {val res task.run(taskAttemptId taskId,attemptNumber attemptNumber,metricsSystem env.metricsSystem)threwException falseres} finally {//下面就是根据上面的运行结果来进行一些判断和日志的打出val releasedLocks env.blockManager.releaseAllLocksForTask(taskId)val freedMemory taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory 0 !threwException) {val errMsg sManaged memory leak detected; size $freedMemory bytes, TID $taskIdif (conf.getBoolean(spark.unsafe.exceptionOnMemoryLeak, false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}if (releasedLocks.nonEmpty !threwException) {val errMsg s${releasedLocks.size} block locks were not released by TID $taskId:\n releasedLocks.mkString([, , , ])if (conf.getBoolean(spark.storage.exceptionOnPinLeak, false)) {throw new SparkException(errMsg)} else {logWarning(errMsg)}}}val taskFinish System.currentTimeMillis()val taskFinishCpu if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// If the task has been killed, lets fail it.if (task.killed) {throw new TaskKilledException}val resultSer env.serializer.newInstance()val beforeSerialization System.currentTimeMillis()val valueBytes resultSer.serialize(value)val afterSerialization System.currentTimeMillis()// Deserialization happens in two parts: first, we deserialize a Task object, which// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.task.metrics.setExecutorDeserializeTime((taskStart - deserializeStartTime) task.executorDeserializeTime)task.metrics.setExecutorDeserializeCpuTime((taskStartCpu - deserializeStartCpuTime) task.executorDeserializeCpuTime)// We need to subtract Task.run()s deserialization time to avoid double-countingtask.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)task.metrics.setExecutorCpuTime((taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)// Note: accumulator updates must be collected after TaskMetrics is updatedval accumUpdates task.collectAccumulatorUpdates()// TODO: do not serialize value twiceval directResult new DirectTaskResult(valueBytes, accumUpdates)val serializedDirectResult ser.serialize(directResult)val resultSize serializedDirectResult.limit// directSend sending directly back to the driverval serializedResult: ByteBuffer {if (maxResultSize 0 resultSize maxResultSize) {logWarning(sFinished $taskName (TID $taskId). Result is larger than maxResultSize s(${Utils.bytesToString(resultSize)} ${Utils.bytesToString(maxResultSize)}), sdropping it.)ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))} else if (resultSize maxDirectResultSize) {val blockId TaskResultBlockId(taskId)env.blockManager.putBytes(blockId,new ChunkedByteBuffer(serializedDirectResult.duplicate()),StorageLevel.MEMORY_AND_DISK_SER)logInfo(sFinished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager))ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))} else {logInfo(sFinished $taskName (TID $taskId). $resultSize bytes result sent to driver)serializedDirectResult}}execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)} catch {case ffe: FetchFailedException val reason ffe.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case _: TaskKilledException logInfo(sExecutor killed $taskName (TID $taskId))setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case _: InterruptedException if task.killed logInfo(sExecutor interrupted and killed $taskName (TID $taskId))setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))case CausedBy(cDE: CommitDeniedException) val reason cDE.toTaskFailedReasonsetTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))case t: Throwable // Attempt to exit cleanly by informing the driver of our failure.// If anything goes wrong (or this was a fatal exception), we will delegate to// the default uncaught exception handler, which will terminate the Executor.logError(sException in $taskName (TID $taskId), t)// Collect latest accumulator values to report back to the driverval accums: Seq[AccumulatorV2[_, _]] if (task ! null) {task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)task.collectAccumulatorUpdates(taskFailed true)} else {Seq.empty}val accUpdates accums.map(acc acc.toInfo(Some(acc.value), None))val serializedTaskEndReason {try {ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))} catch {case _: NotSerializableException // t is not serializable so just send the stacktraceser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))}}setTaskFinishedAndClearInterruptStatus()execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)// Dont forcibly exit unless the exception was inherently fatal, to avoid// stopping other tasks unnecessarily.if (Utils.isFatalError(t)) {SparkUncaughtExceptionHandler.uncaughtException(t)}} finally {runningTasks.remove(taskId) } }} ---------------------------------------------- 前面我们提到过job变为stage有两种ShuffleMapStage和ResultStage那么其对应的也有两个TaskShuffleMapTask和ResultTask不同的task类型执行不同的Task.runTask方法。Task.run方法中调用了runTask的方法这个方法在上面两个Task类中都进行了重写。ShuffleMapTask的runTask方法 ---------------------------------------------- override def runTask(context: TaskContext): MapStatus {// Deserialize the RDD using the broadcast variable.//首先进行一些初始化操作val threadMXBean ManagementFactory.getThreadMXBeanval deserializeStartTime System.currentTimeMillis()val deserializeStartCpuTime if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0Lval ser SparkEnv.get.closureSerializer.newInstance()//反序列化这里的rdd其实是我们进行shuffle之前的最后一个rdd这个我们在前面已经说到的。val (rdd, dep) ser.deserialize[(RDD[], ShuffleDependency[, , ])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime System.currentTimeMillis() - deserializeStartTimeexecutorDeserializeCpuTime if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0L//下面就是把每一个shuffle之前的stage的最后一个rdd进行写入操作但是没有看到task执行我们写的函数也没有看到其调用compute函数以及rdd之间的管道执行也没有体现出来往下看会揭露这些问题的面纱。var writer: ShuffleWriter[Any, Any] nulltry {val manager SparkEnv.get.shuffleManagerwriter manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[ : Product2[Any, Any]]])writer.stop(success true).get} catch {case e: Exception try {if (writer ! null) {writer.stop(success false)}} catch {case e: Exception log.debug(Could not stop writer, e)}throw e}} ---------------------------------------------- 对于上面红色部分的问题我们在这里进行详细的解释。RDD会根据依赖关系来形成一个有向无环图通过最后一个RDD和其依赖我们就可以反向查找其对应的所有父类。如果没有shuffle过程那么其就会形成管道形成管道的好处就是所有RDD的中间结果不需要进行存储直接就把我们的定义的多个函数串连起来从输入到输出中间结果不需要存储节省了时间和空间。同时我们也知道RDD的中间结果可以持久化到内存或者硬盘上spark对于这个是可以追踪到的。 通过上面的分析我们可以看出executor中正是我们RDD往前回溯的开始。对于shuffle过程和ResultTask的runTask的执行过程以后会在慢慢跟进。转载于:https://blog.51cto.com/9269309/2091219

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/916045.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBootMVC相关内容

SpringMVC 基于java实现MVC模型的轻量级Web框架 做Web程序开发 当我们的浏览器发出一个请求给到后端服务器以后,由servlet负责处理请求, servlet只是能够接收请求产生响应,不能进行真正的数据处理 于是将后端服务拆…

小柏实战学习Liunx(图文教程三十五)

小柏实战学习Liunx(图文教程三十五)本节课主题:上一节课,docker镜像加速服务器搭建完成后,本节课需要配置域名,并使用nginx端口转发; 前言:一定要知道每一个命令是啥意思,并且要学会看报错信息,学会使用AI。1. 更新系…

【含文档+PPT+源码】基于GPT+SpringBoot的个人健康管理与咨询架构设计与建立

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

基于节流的流水线并行推理优化——gLLM

通过token节流实现LLM流水线推理服务的全局负载均衡 背景 vllm中的流水线调度策略 在当前的vllm调度中,对于pipeline并行的实现还不完善,存在大量气泡,当前在vllm中的流水线并行调度如下。 以4卡的流水线并行推理为…

Corral the Cows

点评:我认为是一道很不错的题,将很多基础的算法融汇到一起。 题目链接:https://vjudge.net/problem/POJ-3179#author=GPT_zh 题目描述:农夫约翰希望为他的奶牛建造一个围栏。由于奶牛是挑剔的动物,它们要求围栏是…

个人网站开发 怎么赚钱tp5被黑做的网站全变成首页

中秋小长假“余额”就剩半天了尽管心里有太多不舍也要调整自己毕竟假期都是短暂的工作才是职场人生的常态为了尽快消除“假日综合症”e小安贴心送上小文一篇小伙伴们赶紧“脉动”回来吧各类web应用充斥在我们的网络生活中,但是因为开发者安全意识不强而导致的安全问…

HarmonyOS 5 通知与语音能力开发实战:从消息推送到智能语音交互

一、通知系统开发全面解析 1. 通知权限与基础配置 在 module.json5中声明通知和语音相关权限: {"module": {"requestPermissions": [{"name": "ohos.permission.NOTIFICATION_CON…

Keithley 万用表里测电阻分成两种模式

Keithley 万用表里测电阻分成两种模式前面板操作(以常见型号为例)开机,确认表正常自检通过。在 功能选择区(Function/Measure 按键区):找到 Ω(Ohms / RES) 按键,按下后进入电阻测量模式。如果你的型号没有单…

HarmonyOS 5 Native与ArkTS混合开发实战:跨语言高性能组件开发

一、混合开发架构与原理 1. 混合开发核心架构 HarmonyOS混合开发采用分层设计:ArkTS层:UI组件和业务逻辑,使用声明式开发范式 Node-API桥接层:提供类型安全的跨语言通信机制 Native层:C/C++高性能计算和系统级功能…

实战:基于HarmonyOS 5构建分布式聊天通讯应用

1 HarmonyOS 5分布式通信基础 HarmonyOS 5为聊天通讯应用开发带来了革命性的分布式能力,让开发者能够构建真正跨设备、无缝协同的通信体验。本节将介绍如何利用HarmonyOS 5的分布式架构和API 12+特性来打造高性能聊天…

Java-Eclipse使用-多维数组的使用

Java-Eclipse使用-多维数组的使用Day08 1.多维数组的打印方式 2.多维数组的实操使用 3.多维数组的实操使用 4.多维数组的实操使用 明天会继续进行更新,二维数组的打印不太理解。

中国建设银行官方网站 认证网站运营和seo的区别

大家想必都知道,数组和链表的搜索操作的时间复杂度都是O(N)的,在数据量大的时候是非常耗时的。对于数组来说,我们可以先排序,然后使用二分搜索,就能够将时间复杂度降低到O(logN),但是有序数组的插入是一个O…

HarmonyOS 5 动画开发实战:从基础动效到高级交互动画

🎯 一、HarmonyOS动画系统概述 HarmonyOS提供了强大而灵活的动画系统,支持多种动画类型和交互效果:动画类型 适用场景 核心API属性动画 组件尺寸、位置、透明度等属性变化 animateTo(), Animation()转场动画 组件出…

HarmonyOS 5 高级动效实战:粒子系统、路径动画与物理动效开发

一、粒子系统动画开发 粒子动画通过大量微小元素的运动创造复杂视觉效果,如雨雪、火焰、爆炸等自然现象。 1. 基础粒子系统实现 import particle from @ohos.graphics.particle; import { BusinessError } from @ohos…

从范德蒙德矩阵聊开去.

范德蒙德矩阵 : \(\bm{V}=\left[ \begin{array}{ccc}1 & x_0 & x_0^2 & \cdots & x_0^{n-1} \\1 & x_1 & x_1^2 & \cdots & x_1^{n-1} \\1 & x_2 & x_2^2 & \cdots &…

全新 CloudPilot AI:嵌入 Kubernetes 的 SRE Agent,降本与韧性双提升!

全新 CloudPilot AI:嵌入 Kubernetes 的 SRE Agent,降本与韧性双提升!在生成式 AI 的浪潮下,计算资源,尤其是支撑所有服务运行与调度的 CPU 资源,已经从单纯的成本项目,升级为驱动创新速度与竞争力的底层基石。…

HarmonyOS 5 动画性能优化深度解析:从原理到实践

一、HarmonyOS动画系统架构与渲染原理 1. 动画系统核心架构 HarmonyOS的动画系统采用分层设计,包含三个关键层级:UI组件层:基于ArkUI的声明式动画API(如animateTo) 动画引擎层:负责插值计算和时间轴管理 渲染管线…

容桂网站制作公司系统类小说

类ReentrantLock具有完全互斥排他的效果,即同一时间只有一个线程在执行ReentrantLock.lock()后面的代码。这样虽然保证了线程的安全性,但是效率低下。JDK提供了ReentrantReadWriteLock读写锁,使用它可以加快效率,在某些不需要操作…

vue3 + antd +ts cron 选择器使用

https://github.com/shiyzhang/shiyzhang-cron shiyzhang-cron组件 使用方法:npm i shiyzhangcron 或 pnpm i shiyzhangcron 给ts添加类型声明文件在项目根目录下创建 types 文件夹 在 types 文件夹中创建 shiyzhangc…

HarmonyOS 5 性能优化全攻略:从启动加速到内存管理

🚀 一、启动速度优化 应用启动是用户体验的第一印象,优化启动速度至关重要。 1. 冷启动时延优化 通过精简初始化流程和资源预加载,可将启动时间缩短30%-50%。 // LaunchOptimization.ets import TaskPool from @oh…