日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用 Akka 实现 Master 与 Worker 之间的通信

發布時間:2025/3/12 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Akka 实现 Master 与 Worker 之间的通信 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

MessageProtocol.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.common/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:54*//*** 用于 Work 注冊時發送注冊信息*/ case class WorkerRegisterInfo(id: String, cpu: Int, ram: Int) {}/*** 用于保存到 Master 的 HashMap 中*/ class WorkerInfo(var id: String, cpu: Int, ram: Int) {var lastHeartBeatTime = System.currentTimeMillis() }/*** 注冊成功后,Master 回應此類型的消息,表示注冊成功* Worker 接收到后,啟動心跳機制*/ case object RegisteredInfo/*** worker 每隔一定時間由定時器發給自己的一個消息,用于觸發自己給 Master 發送消息*/ case object SendHeartBeat/*** 由自己的消息觸發,然后給 Master 發送 HeartBeat 消息,消息要帶上自己的 id*/ case class HeartBeat(id: String)/*** Master 給自己發送一個觸發檢查超時 Worker 的消息,定時獲取已離線的 Worker*/ case object StartCheckTimeOutWorker/*** Master 給自己發消息,檢測 Worker 是否已離線,如果已離線,則移除*/ case object RemoveTimeOutWorker

SparkWorker.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.workerimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import top.gldwolf.scala.akkademo.sparkmasterandworker.common.{HeartBeat, RegisteredInfo, SendHeartBeat, WorkerRegisterInfo}import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 10:03*/object SparkWorker {def main(args: Array[String]): Unit = {if (args.length < 6) {println("參數個數不正確:host, port, workerName, masterName, masterHost, masterPort...")System.exit(-1)}val host = args(0)val port = args(1).toIntval workerName = args(2)val masterName = args(3)val masterHost = args(4)val masterPort = args(5).toIntval config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val workerFactory: ActorSystem = ActorSystem("WorkerFactory", config)val workerRef: ActorRef = workerFactory.actorOf(Props(new SparkWorker(masterHost, masterPort, masterName)), workerName)workerRef ! "start"} }class SparkWorker(masterHost: String, masterPort: Int, masterName: String) extends Actor {var masterRef: ActorSelection = _var id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {masterRef = context.actorSelection("akka.tcp://MasterFactory@" +s"${masterHost}:${masterPort}/user/${masterName}")}override def receive: Receive = {case "start" => {println("Worker " + "已上線~~~")masterRef ! WorkerRegisterInfo(id, 4, 4096)}// 接收到這個消息表示注冊成功,然后會給定時給自己發送 SendHeartBeat 消息,觸發心跳機制case RegisteredInfo => {println("workerId: " + id + " 注冊成功!")import context.dispatcher// 說明:// 1. 0 millis 表示不延時,立即執行定時器// 2. 3000 millis 表示每隔 3 秒執行一次// 3. self 表示發送給自己// 4. SendHeartBeat 表示發送的內容context.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(3000, "millis"), self, SendHeartBeat)}// 接收到自己的定時器發送給自己的消息時,觸發下面內容,給 Master 發送心跳信息case SendHeartBeat => {println("worker: " + id + " 發送心跳信息~~~")masterRef ! HeartBeat(id)}} }

SparkMaster.scala

package top.gldwolf.scala.akkademo.sparkmasterandworker.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import top.gldwolf.scala.akkademo.sparkmasterandworker.common._import scala.collection.mutable import scala.concurrent.duration.FiniteDuration/*** @author: Gldwolf* @email: ZengqiangZhao@sina.com* @date: 2020/4/17 9:56*/object SparkMaster {def main(args: Array[String]): Unit = {if (args.length < 3) {println("參數不夠,請轉入 host, port, masterName...")System.exit(-1)}val host = args(0)val port = args(1)val masterName = args(2)val config: Config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=$host|akka.remote.netty.tcp.port=$port|""".stripMargin)val masterFactory: ActorSystem = ActorSystem("MasterFactory", config)val masterRef: ActorRef = masterFactory.actorOf(Props[SparkMaster], masterName)masterRef ! "start"} }class SparkMaster extends Actor {val workers: mutable.HashMap[String, WorkerInfo] = mutable.HashMap()override def receive: Receive = {case "start" => {// 這里開始啟動 Master 定時檢測任務,判斷 Worker 有沒有離線println("Master 已上線~~~")self ! StartCheckTimeOutWorker // 給自己發一個開始檢測的消息}case WorkerRegisterInfo(id, cpu, ram) => { // 如果是注冊信息,則將注冊信息管理起來val workerInfo = new WorkerInfo(id, cpu, ram)if (!workers.contains(id)) { // 判斷是否已經添加這個 workerworkers += ((id, workerInfo)) // 如果沒有則添加進來println("worker:" + id + " 注冊成功~~~")println("目前已有的 Workers: " + workers)// 回復注冊成功消息sender() ! RegisteredInfo}}case HeartBeat(id) => { // 接收到 worker 的心跳信息val workerInfo = workers(id)val lastHeartBeatTime: Long = System.currentTimeMillis()workerInfo.lastHeartBeatTime = lastHeartBeatTimeprintln("Master 更新了 " + id + " 的心跳時間: " + lastHeartBeatTime)}case StartCheckTimeOutWorker => {// 獲取到消息后開始定時檢測離線的 Workerprintln("----- Master 開啟定時任務,檢測已離線的 Worker -----")import context.dispatchercontext.system.scheduler.schedule(FiniteDuration(0, "millis"), FiniteDuration(9000, "millis"), self, RemoveTimeOutWorker)}// 檢測哪些 Worker 超時了,并從 Worker 中刪除case RemoveTimeOutWorker => {val now: Long = System.currentTimeMillis// 如果最后一次心跳距離現在有 6 秒,那么就代表離線了,則刪除這個 Worker // for ((id, workerInfo) <- workers) { // if ((now - workerInfo.lastHeartBeatTime) / 1000 > 6) { // workers.remove(id) // println("worker: " + id + " 已離線,將其移除!") // } // }// 也可以使用函數式編程將其移除workers.values.filter(worker => now - worker.lastHeartBeatTime > 6000).foreach(worker => {println("Worker: " + worker.id + " 離線,已將其移除!")workers.remove(worker.id)})println("當前有 " + workers.size + " 個 Worker 存活!")}} }

總結

以上是生活随笔為你收集整理的使用 Akka 实现 Master 与 Worker 之间的通信的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。