MessageProtocol.scala
package top. gldwolf. scala. akkademo. sparkmasterandworker. common
case class WorkerRegisterInfo ( id: String, cpu: Int, ram: Int) { }
class WorkerInfo ( var id: String, cpu: Int, ram: Int) { var lastHeartBeatTime = System. currentTimeMillis ( )
}
case object RegisteredInfo
case object SendHeartBeat
case class HeartBeat ( id: String)
case object StartCheckTimeOutWorker
case object RemoveTimeOutWorker
SparkWorker.scala
package top. gldwolf. scala. akkademo. sparkmasterandworker. workerimport akka. actor. { Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com. typesafe. config. { Config, ConfigFactory}
import top. gldwolf. scala. akkademo. sparkmasterandworker. common. { HeartBeat, RegisteredInfo, SendHeartBeat, WorkerRegisterInfo} import scala. concurrent. duration. FiniteDurationobject SparkWorker { def main ( args: Array[ String] ) : Unit = { if ( args. length < 6 ) { println ( "参数个数不正确:host, port, workerName, masterName, masterHost, masterPort..." ) System. exit ( - 1 ) } val host = args ( 0 ) val port = args ( 1 ) . toIntval workerName = args ( 2 ) val masterName = args ( 3 ) val masterHost = args ( 4 ) val masterPort = args ( 5 ) . toIntval config: Config = ConfigFactory. parseString ( s"" "| akka. actor. provider= "akka.remote.RemoteActorRefProvider" | akka. remote. netty. tcp. hostname= $host| akka. remote. netty. tcp. port= $port| "" ". stripMargin) val workerFactory: ActorSystem = ActorSystem ( "WorkerFactory" , config) val workerRef: ActorRef = workerFactory. actorOf ( Props ( new SparkWorker ( masterHost, masterPort, masterName) ) , workerName) workerRef ! "start" }
} class SparkWorker ( masterHost: String, masterPort: Int, masterName: String) extends Actor { var masterRef: ActorSelection = _var id = java. util. UUID. randomUUID ( ) . toStringoverride def preStart ( ) : Unit = { masterRef = context. actorSelection ( "akka.tcp://MasterFactory@" + s"${masterHost}:${masterPort}/user/${masterName}" ) } override def receive: Receive = { case "start" = > { println ( "Worker " + "已上线~~~" ) masterRef ! WorkerRegisterInfo ( id, 4 , 4096 ) } case RegisteredInfo = > { println ( "workerId: " + id + " 注册成功!" ) import context. dispatchercontext. system. scheduler. schedule ( FiniteDuration ( 0 , "millis" ) , FiniteDuration ( 3000 , "millis" ) , self, SendHeartBeat) } case SendHeartBeat = > { println ( "worker: " + id + " 发送心跳信息~~~" ) masterRef ! HeartBeat ( id) } }
}
SparkMaster.scala
package top. gldwolf. scala. akkademo. sparkmasterandworker. masterimport akka. actor. { Actor, ActorRef, ActorSystem, Props}
import com. typesafe. config. { Config, ConfigFactory}
import top. gldwolf. scala. akkademo. sparkmasterandworker. common. _import scala. collection. mutable
import scala. concurrent. duration. FiniteDurationobject SparkMaster { def main ( args: Array[ String] ) : Unit = { if ( args. length < 3 ) { println ( "参数不够,请转入 host, port, masterName..." ) System. exit ( - 1 ) } val host = args ( 0 ) val port = args ( 1 ) val masterName = args ( 2 ) val config: Config = ConfigFactory. parseString ( s"" "| akka. actor. provider= "akka.remote.RemoteActorRefProvider" | akka. remote. netty. tcp. hostname= $host| akka. remote. netty. tcp. port= $port| "" ". stripMargin) val masterFactory: ActorSystem = ActorSystem ( "MasterFactory" , config) val masterRef: ActorRef = masterFactory. actorOf ( Props[ SparkMaster] , masterName) masterRef ! "start" }
} class SparkMaster extends Actor { val workers: mutable. HashMap[ String, WorkerInfo] = mutable. HashMap ( ) override def receive: Receive = { case "start" = > { println ( "Master 已上线~~~" ) self ! StartCheckTimeOutWorker } case WorkerRegisterInfo ( id, cpu, ram) = > { val workerInfo = new WorkerInfo ( id, cpu, ram) if ( ! workers. contains ( id) ) { workers += ( ( id, workerInfo) ) println ( "worker:" + id + " 注册成功~~~" ) println ( "目前已有的 Workers: " + workers) sender ( ) ! RegisteredInfo} } case HeartBeat ( id) = > { val workerInfo = workers ( id) val lastHeartBeatTime: Long = System. currentTimeMillis ( ) workerInfo. lastHeartBeatTime = lastHeartBeatTimeprintln ( "Master 更新了 " + id + " 的心跳时间: " + lastHeartBeatTime) } case StartCheckTimeOutWorker = > { println ( "----- Master 开启定时任务,检测已离线的 Worker -----" ) import context. dispatchercontext. system. scheduler. schedule ( FiniteDuration ( 0 , "millis" ) , FiniteDuration ( 9000 , "millis" ) , self, RemoveTimeOutWorker) } case RemoveTimeOutWorker = > { val now: Long = System. currentTimeMillis
workers. values. filter ( worker = > now - worker. lastHeartBeatTime > 6000 ) . foreach ( worker = > { println ( "Worker: " + worker. id + " 离线,已将其移除!" ) workers. remove ( worker. id) } ) println ( "当前有 " + workers. size + " 个 Worker 存活!" ) } }
}