使用 Akka 实现 Master 与 Worker 之间的通信

MessageProtocol.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.common/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:54*//*** 用于 Work 注册时发送注册信息*/
case class WorkerRegisterInfo(id: String, cpu: Int, ram: Int) {}/*** 用于保存到 Master 的 HashMap 中*/
class WorkerInfo(var id: String, cpu: Int, ram: Int) {var lastHeartBeatTime = System.currentTimeMillis()
}/*** 注册成功后,Master 回应此类型的消息,表示注册成功* Worker 接收到后,启动心跳机制*/
case object RegisteredInfo/*** worker 每隔一定时间由定时器发给自己的一个消息,用于触发自己给 Master 发送消息*/
case object SendHeartBeat/*** 由自己的消息触发,然后给 Master 发送 HeartBeat 消息,消息要带上自己的 id*/
case class HeartBeat(id: String)/*** Master 给自己发送一个触发检查超时 Worker 的消息,定时获取已离线的 Worker*/
case object StartCheckTimeOutWorker/*** Master 给自己发消息,检测 Worker 是否已离线,如果已离线,则移除*/
case object RemoveTimeOutWorker

SparkWorker.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.workerimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import top.gldwolf.scala.akkademo.sparkmasterandworker.common.{HeartBeat, RegisteredInfo, SendHeartBeat, WorkerRegisterInfo}import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:03*/object SparkWorker {def main(args: Array[String]): Unit = {if (args.length < 6) {println("参数个数不正确:host, port, workerName, masterName, masterHost, masterPort...")System.exit(-1)}val host = args(0)val port = args(1).toIntval workerName = args(2)val masterName = args(3)val masterHost = args(4)val masterPort = args(5).toIntval config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val workerFactory: ActorSystem = ActorSystem("WorkerFactory", config)val workerRef: ActorRef = workerFactory.actorOf(Props(new SparkWorker(masterHost, masterPort, masterName)), workerName)workerRef ! "start"}
}class SparkWorker(masterHost: String, masterPort: Int, masterName: String) extends Actor {var masterRef: ActorSelection = _var id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {masterRef = context.actorSelection("akka.tcp://MasterFactory@" +s"${masterHost}:${masterPort}/user/${masterName}")}override def receive: Receive = {case "start" => {println("Worker " + "已上线~~~")masterRef ! WorkerRegisterInfo(id, 4, 4096)}// 接收到这个消息表示注册成功,然后会给定时给自己发送 SendHeartBeat 消息,触发心跳机制case RegisteredInfo => {println("workerId: " + id + " 注册成功!")import context.dispatcher// 说明:// 1. 0 millis 表示不延时,立即执行定时器// 2. 3000 millis 表示每隔 3 秒执行一次// 3. self 表示发送给自己// 4. SendHeartBeat 表示发送的内容context.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(3000, "millis"), self, SendHeartBeat)}// 接收到自己的定时器发送给自己的消息时,触发下面内容,给 Master 发送心跳信息case SendHeartBeat => {println("worker: " + id + " 发送心跳信息~~~")masterRef ! HeartBeat(id)}}
}

SparkMaster.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import top.gldwolf.scala.akkademo.sparkmasterandworker.common._import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 9:56*/object SparkMaster {def main(args: Array[String]): Unit = {if (args.length < 3) {println("参数不够,请转入 host, port, masterName...")System.exit(-1)}val host = args(0)val port = args(1)val masterName = args(2)val config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val masterFactory: ActorSystem = ActorSystem("MasterFactory", config)val masterRef: ActorRef = masterFactory.actorOf(Props[SparkMaster], masterName)masterRef ! "start"}
}class SparkMaster extends Actor {val workers: mutable.HashMap[String, WorkerInfo] = mutable.HashMap()override def receive: Receive = {case "start" => {// 这里开始启动 Master 定时检测任务,判断 Worker 有没有离线println("Master 已上线~~~")self ! StartCheckTimeOutWorker // 给自己发一个开始检测的消息}case WorkerRegisterInfo(id, cpu, ram) => { // 如果是注册信息,则将注册信息管理起来val workerInfo = new WorkerInfo(id, cpu, ram)if (!workers.contains(id)) { // 判断是否已经添加这个 workerworkers += ((id, workerInfo)) // 如果没有则添加进来println("worker:" + id + " 注册成功~~~")println("目前已有的 Workers: " + workers)// 回复注册成功消息sender() ! RegisteredInfo}}case HeartBeat(id) => { // 接收到 worker 的心跳信息val workerInfo = workers(id)val lastHeartBeatTime: Long = System.currentTimeMillis()workerInfo.lastHeartBeatTime = lastHeartBeatTimeprintln("Master 更新了 " + id + " 的心跳时间: " + lastHeartBeatTime)}case StartCheckTimeOutWorker => {// 获取到消息后开始定时检测离线的 Workerprintln("----- Master 开启定时任务,检测已离线的 Worker -----")import context.dispatchercontext.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(9000, "millis"), self, RemoveTimeOutWorker)}// 检测哪些 Worker 超时了,并从 Worker 中删除case RemoveTimeOutWorker => {val now: Long = System.currentTimeMillis// 如果最后一次心跳距离现在有 6 秒,那么就代表离线了,则删除这个 Worker
//            for ((id, workerInfo) <- workers) {
//                if ((now - workerInfo.lastHeartBeatTime) / 1000  > 6) {
//                    workers.remove(id)
//                    println("worker: " + id + " 已离线,将其移除!")
//                }
//            }// 也可以使用函数式编程将其移除workers.values.filter(worker => now - worker.lastHeartBeatTime > 6000).foreach(worker => {println("Worker: " + worker.id + " 离线,已将其移除!")workers.remove(worker.id)})println("当前有 " + workers.size + " 个 Worker 存活!")}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/557375.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

URL传Base64 造成报错 Illegal base64 character 20

报错如下&#xff1a; errorInternal Server Error, messageIllegal base64 character 20, tracejava.lang.IllegalArgumentException: Illegal base64 character 20 at java.util.Base64Decoder.decode0(Base64.java:714)atjava.util.Base64Decoder.decode0(Base64.java:714) …

Linux 中使用 sort 指令分组排序详解

Linux 中使用 sort 指令分组排序详解 sort 中进行分组排序主要用到的选项为 -k&#xff0c;此文&#xff0c;我们着重于该选项的使用方式&#xff0c;用到的其它选项不做解释&#xff0c;有兴趣的同学可以查看帮助文档 1. 数据准备 现有数据如下&#xff0c;文件名 sort_so…

Shiro-单点登录原理

单点登录原理 一、单系统登录机制 1、http无状态协议 web应用采用browser/server架构&#xff0c;http作为通信协议。http是无状态协议&#xff0c;浏览器的每一次请求&#xff0c;服务器会独立处理&#xff0c;不与之前或之后的请求产生关联&#xff0c;这个过程用下图说明…

关于 Java 同名类加载顺序问题排查方案

排查背景 最近在生产上部署 UDF 时&#xff0c;遇到一个两个环境完全相同&#xff0c;但是一个客户端报错另一个正常的情况&#xff0c;经过多次调试问题终于得以解决&#xff0c;现将解决思路记录一下&#xff0c;希望能对后来者有所帮助。(生产环境不便于截图。。。暂不展示…

Arrays.asList() 详解

Arrays.asList() 详解 【1. 要点】 该方法是将数组转化成List集合的方法。 List list Arrays.asList(“a”,“b”,“c”); 注意&#xff1a; &#xff08;1&#xff09;该方法适用于对象型数据的数组&#xff08;String、Integer…&#xff09; &#xff08;2&#xff0…

Vim 编码问题详解

Vim 编码问题详解 vim 中有 4 个与编码相关的配置&#xff0c;分别是 encoding、termencoding、fileencoding 和 fileencodings。在实际使用中任何一个配置有问题都可能会导致乱码&#xff0c;因此我们应该清楚每个配置的含义。 1. encoding encoding 是 vim 内部使用的字符编…

@Autowired作用在普通方法上

Autowired作用在普通方法上 Autowired作用在普通方法上&#xff0c;会在注入的时候调用一次该方法&#xff0c;如果方法中有实体参数&#xff0c;会对方法里面的参数进行装配&#xff0c;并调用一次该方法。这个可以用来在自动注入的时候做一些初始化操作。

@Autowired注解作用在方法上

Autowired注解作用在方法上 Autowired注解作用在方法上 &#xff08;1&#xff09;该方法如果有参数&#xff0c;会使用autowired的方式在容器中查找是否有该参数 &#xff08;2&#xff09;会执行该方法

spring定时任务的几种实现方式

spring定时任务的几种实现方式 一&#xff0e;分类 从实现的技术上来分类&#xff0c;目前主要有三种技术&#xff08;或者说有三种产品&#xff09;&#xff1a; Java自带的java.util.Timer类&#xff0c;这个类允许你调度一个java.util.TimerTask任务。使用这种方式可以让…

Spring定时任务

Spring定时任务(一)&#xff1a;SpringTask使用 背景&#xff1a;在日常开发中&#xff0c;经常会用到任务调度这类程序。实现方法常用的有&#xff1a;A. 通过java.util.Timer、TimerTask实现。 B.通过Spring自带的SpringTask。 C. 通过Spring结合Quartz实现。本文我们将讲述…

关于Spring 任务调度之task:scheduler与task:executor配置的详解

关于Spring 任务调度之task:scheduler与task:executor配置的详解 其实就是Spring定时器中配置文件中一些配置信息&#xff0c;由于笔者自己是头一次使用&#xff0c;有些配置详细不太明白&#xff0c;随即研究了一番&#xff0c;于是想记录一下&#xff0c;有需要的小伙伴可以…

Spring的任务调度@Scheduled注解——task:scheduler和task:executor的解析

Spring的任务调度Scheduled注解——task:scheduler和task:executor的解析 applicationContext 的配置如下: <?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframework.org/schema/beans"xmlns:context"…

CAP 理论、BASE 理论、FLP 理论

CAP 理论、BASE 理论、FLP 理论 CAP 理论、BASE 理论、FLP 理论 1.CAP 理论 C(Consistency) 一致性: 在写操作之后的所有读操作&#xff0c;必须要返回写入的值。 A(Availability) 可用性&#xff1a; 只要收到用户的请求&#xff0c;服务端就必须给出回应。 P(Partitio…

Spring的@Scheduled注解实现定时任务

Spring的Scheduled注解实现定时任务 【简介篇】 项目经常会用到定时任务&#xff0c;实现定时任务的方式有很多种。在Spring框架中&#xff0c;实现定时任务很简单&#xff0c;常用的实现方式是使用注解Scheduled。 Scheduled 常用来实现简单的定时任务。例如凌晨1点跑批&am…

接口测试如何测

接口测试如何测 一.什么是接口&#xff1f; 接口测试主要用于外部系统与系统之间以及内部各个子系统之间的交互点&#xff0c;定义特定的交互点&#xff0c;然后通过这些交互点来&#xff0c;通过一些特殊的规则也就是协议&#xff0c;来进行数据之间的交互。 二.接口都有哪…

CommandLineRunner 和 ApplicationRunner 的区别

CommandLineRunner 和 ApplicationRunner 概述 CommandLineRunner 和 ApplicationRunner 的作用类似, 都可以在 Spring 容器初始化之后执行某些操作。比较适用于某些复杂的 Bean 加载完成之后执行一些操作。例如 Feign 调用。 相同点 都可以获取到启动时指定的外部参数。主逻…

深入学习二叉树(一) 二叉树基础

深入学习二叉树(一) 二叉树基础 前言 树是数据结构中的重中之重&#xff0c;尤其以各类二叉树为学习的难点。一直以来&#xff0c;对于树的掌握都是模棱两可的状态&#xff0c;现在希望通过写一个关于二叉树的专题系列。在学习与总结的同时更加深入的了解掌握二叉树。本系列文…

ApplicationContext 和 BeanFactory 的区别

概述 首先解释一下两个名词: BeanFactory 是 Bean 工厂。ApplicationContext 是应用上下文。 ApplicationContext 和 BeanFactory 都是装载 Bean 的容器, 且 ApplicationContext 继承自 BeanFactory。但 ApplicationContext 较 BeanFactory 来说更高级一点。 主要区别: 是否…

深入学习二叉树(二) 线索二叉树

深入学习二叉树(二) 线索二叉树 1 前言 在上一篇简单二叉树的学习中&#xff0c;初步介绍了二叉树的一些基础知识&#xff0c;本篇文章将重点介绍二叉树的一种变形——线索二叉树。 2 线索二叉树 2.1 产生背景 现有一棵结点数目为n的二叉树&#xff0c;采用二叉链表的形式…

Java 逃逸分析

定义 分析对象动态作用域, 看别的方法或线程是否有途径能访问到这个对象。所谓逃逸分析,就是分析对象动态作用域,看别的方法或线程是否有途径能访问到这个对象,如果不能,那么编译器就可以为这个变量提供更高效的优化。 当一个对象, 能被其他方法访问到时, 这种逃逸叫做方法逃…