日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例

發(fā)布時(shí)間:2024/9/27 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

使用Akka編寫一個(gè)RPC框架,實(shí)現(xiàn)Master與多個(gè)Worker之間的通信。流程圖如下:

編寫Pom文件,Pom文件的代碼如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.akka</groupId><artifactId>MyRPC</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><scala.compat.version>2.10</scala.compat.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.10</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.10</artifactId><version>2.3.14</version></dependency></dependencies><build><!-- 源碼包放置的位置 --><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!--專門用于編譯scala的插件--><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><!--主要使用來的打包的插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><!--表示把下面的這些文件給刪掉--><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.akka.Worker</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build> </project>

Message的代碼如下:

package cn.toto.akka/*** Created by toto on 2017/7/3.*/ trait Message extends Serializable//Worker -> Master worker向master發(fā)送相關(guān)信息 case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message//Master -> Worker Master向worker發(fā)送相關(guān)信息 case class RegisteredWorker(masterUrl: String) extends Message//Worker -> Master case class Heartbeat(id: String) extends Message//Worker internal message case object SendHeartbeat//Master internal message case object CheckTimeOutWorker

WorkerInfo的代碼如下:

package cn.toto.akka/*** Created by toto on 2017/7/3.*/ class WorkerInfo(val id: String, val cores: Int, val memory: Int) {//TODOvar lastHeartbeatTime: Long = _ }

Master的代碼如下:

package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactoryimport scala.collection.mutable//如果想使用millis毫秒單位,需要引入下面這個(gè)包 import scala.concurrent.duration._/*** Created by toto on 2017/7/3.*/ class Master(val host: String, val port: Int) extends Actor{//保存WorkerID 到 WorkerInfo的映射val idToWorker = new mutable.HashMap[String, WorkerInfo]()//保存所的WorkerInfo信息val workers = new mutable.HashSet[WorkerInfo]()val CHECK_INTERVAL = 15000override def preStart(): Unit = {//導(dǎo)入隱式轉(zhuǎn)換import context.dispatchercontext.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)}override def receive: Receive = {//Worker發(fā)送個(gè)Mater的注冊(cè)消息case RegisterWorker(workerId, cores, memory) => {if(!idToWorker.contains(workerId)) {//封裝worker發(fā)送的信息val workerInfo = new WorkerInfo(workerId, cores, memory)//保存workerInfoidToWorker(workerId) = workerInfoworkers += workerInfo//Master向Worker反饋?zhàn)?cè)成功的消息sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")}}//Worker發(fā)送給Master的心跳信息case Heartbeat(workerId) => {if(idToWorker.contains(workerId)) {val workerInfo = idToWorker(workerId)val currentTime = System.currentTimeMillis()//更新上一次心跳時(shí)間workerInfo.lastHeartbeatTime = currentTime}}//檢測(cè)超時(shí)的Workercase CheckTimeOutWorker => {val currentTime = System.currentTimeMillis()val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)// for(w <- deadWorkers) {// idToWorker -= w.id// workers -= w// }deadWorkers.foreach(w => {idToWorker -= w.idworkers -= w})println("alive worker size : " + workers.size)}} }object Master {val MASTER_SYSTEM = "MaterActorSystem"val MASTER_NAME = "Master"def main(args: Array[String]): Unit = {val host = args(0)val port = args(1).toIntval confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//ActorSystem是單例的,用于創(chuàng)建Acotor并監(jiān)控actorval actorSystem = ActorSystem(MASTER_SYSTEM, conf)//通過ActorSystem創(chuàng)建ActoractorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)actorSystem.awaitTermination()} }

worker的代碼如下:

package cn.toto.akkaimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._/*** Created by toto on 2017/7/3.*/ class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor{//Master的引用var master: ActorSelection = _//Worker的IDval workerId = UUID.randomUUID().toString//masterUrlvar masterUrl: String = _val HEARTBEAT_INTERVAL = 10000//preStart在構(gòu)造器之后receive之前執(zhí)行override def preStart(): Unit = {//首先跟Master建立連接master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")//通過master的引用向Master發(fā)送注冊(cè)消息master ! RegisterWorker(workerId, cores, memory)}override def receive: Receive = {//Master發(fā)送給Worker注冊(cè)成功的消息case RegisteredWorker(masterUrl) => {this.masterUrl = masterUrl//啟動(dòng)定時(shí)任務(wù),向Master發(fā)送心跳//導(dǎo)入隱式轉(zhuǎn)換import context.dispatchercontext.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)}case SendHeartbeat => {//向Master發(fā)送心跳master ! Heartbeat(workerId)}} }object Worker {def main(args: Array[String]): Unit = {//Worker的地址和端口val host = args(0)val port = args(1).toIntval cores = args(2).toIntval memory = args(3).toInt//Master的地址和端口val masterHost = args(4)val masterPort = args(5).toIntval confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem", conf)//通過actorSystem來創(chuàng)建Actorval worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")actorSystem.awaitTermination()} }

運(yùn)行Master的代碼:
準(zhǔn)備工作:

接著模擬傳入的參數(shù):

接著右鍵Run Master,效果如下:

運(yùn)行Woker程序
模擬傳入的參數(shù):

右鍵運(yùn)行Worker的代碼,效果如下:

總結(jié)

以上是生活随笔為你收集整理的Akka编写一个RPC框架,模拟多个Worker连接Master的情况的案例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 激情网五月 | 捆绑无遮挡打光屁股 | 性av网站| 都市激情国产精品 | 91精品一区二区三区在线观看 | 超碰人人超碰 | 国产香蕉视频在线 | 自拍偷拍在线播放 | 国产精品资源站 | 国产a视频精品免费观看 | 欧美贵妇videos办公室 | 一级绝黄 | 天堂8中文在线 | 国产成人在线视频免费观看 | 亚洲色吧 | 四虎影视库 | 最新黄色av | 91新网站| 欧美在线网站 | 成年免费在线观看 | 亚洲三级国产 | 国产五月| 日韩精品一二三区 | 青青久久av北条麻妃黑人 | 青青草草| 亚洲欧美一区二区三区久久 | 午夜老司机免费视频 | 精品无码一区二区三区爱欲 | 午夜精品一区二区三 | 亚洲色图制服丝袜 | 人人做人人爱人人爽 | 国产丝袜精品视频 | 大学生一级一片全黄 | 熟女俱乐部五十路六十路av | 国产精品一级二级三级 | 色一情一区二区三区四区 | 米奇狠狠干 | 久久久久久电影 | 熟妇高潮一区二区高潮 | 日本在线高清视频 | 久久综合成人网 | 538国产精品一区二区免费视频 | 玖玖爱国产 | 天天插日日插 | 精品九九九九 | 天堂а√在线中文在线 | 三及毛片 | 国产精品无码影院 | 亚洲精品无码成人 | 毛片在线视频观看 | 337p粉嫩大胆噜噜噜噜69影视 | 少妇粉嫩小泬白浆流出 | 国产美女主播视频 | 日韩欧美高清视频 | 成年人免费观看网站 | 欧美日韩一区二区视频观看 | 果冻av在线 | 日美一级片| 免费毛片大全 | 亚洲精品久久久久久久蜜桃 | 日本欧美视频 | 久热一区 | 欧美激情xxx | 人妻丰满熟妇av无码久久洗澡 | 日本免费一级片 | 日本免费中文字幕 | 在线免费中文字幕 | 毛片在线视频播放 | 干美女av | 国产精品久久久久久一区二区 | 三年中文在线观看免费观看 | 18禁超污无遮挡无码免费游戏 | 97黄色网| 色人阁网站 | 二区视频在线 | 美女脱光衣服让男人捅 | 中文字幕亚洲精品在线 | 黄色免费网站视频 | 色吧综合| 亚洲久热 | 九热这里只有精品 | 国产精品偷拍 | 国产黑丝精品 | 开心激情亚洲 | 亚洲一区二区视频网站 | 桃色视频网 | 新版天堂资源中文8在线 | 亚洲精品.www| 亚洲蜜臀av乱码久久精品蜜桃 | 国产精品久久久久久一区二区三区 | 香蕉久久夜色精品国产使用方法 | 久久久久久久久国产精品一区 | 999毛片 | 日韩黄色三级视频 | 少妇色综合 | 色婷婷aⅴ一区二区三区 | 超碰h | 99久久人妻无码精品系列 | 免费看a级黄色片 |