日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

AKKA:大数据下的并发编程模型

發布時間:2024/8/1 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 AKKA:大数据下的并发编程模型 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在大數據如日中天的當今,開發中只會調用 API 是遠遠不夠的,火熱的 Spark、Flink 被越來越多的人掌握,這就驅使技術人員向技術中更深層次的知識去挖掘,今天我們就一起聊聊分布式計算和通信實現技術 AKKA,到底依靠哪些優勢被 Spark 和 Flink 所使用。

在本場 Chat 中,一萬多字中會講到如下內容:

  • Akka 介紹、Actor 模型入門
  • Actor 工作機制、消息傳遞、應用實例
  • Akka 網絡編程:理論講解
  • Akka 網絡編程:手敲代碼
  • Spark 使用 Akka 實現進程通訊

適合人群: 對 Akka 有興趣及深入大數據技術的技術人員

1 Akka 介紹

  • Akka 是 JAVA 虛擬機 JVM 平臺上構建高并發、分布式和容錯應用的工具包和運行時,你可以理解成 Akka 是編寫并發程序的框架。

  • Akka 用 Scala 語言寫成,同時提供了 Scala 和 JAVA 的開發接口。

  • Akka 主要解決的問題是:可以輕松的寫出高效穩定的并發程序,程序員不再過多的考慮線程、鎖和資源競爭等細節。

  • 2 Akka 中 Actor(角色) 模型

  • 處理并發問題關鍵是要保證共享數據的一致性和正確性,因為程序是多線程時,多個線程對同一個數據進行修改,若不加同步條件,勢必會造成數據污染。但是當我們對關鍵代碼加入同步條件 synchronized 后,實際上大并發就會阻塞在這段代碼,對程序效率有很大影響。

  • 若是用單線程處理,不會有數據一致性的問題,但是系統的性能又不能保證。

  • Actor 模型的出現解決了這個問題,簡化并發編程,提升程序性能。 你可以這里理解:Actor 模型是一種處理并發問題的解決方案,很牛!

  • 3 Akka 中 Actor 模型

    對上面的 Actor 模型做了說明和小結

  • Akka 處理并發的方法基于 Actor 模型(示意圖)

  • 在基于 Actor 的系統里,所有的事物都是 Actor,就好像在面向對象設計里面所有的事物都是對象一樣。

  • Actor 模型是作為一個并發模型設計和架構的。Actor 與 Actor 之間只能通過消息通信[消息的發送必須通過 ActorRef 發送],如圖的信封

  • Actor 與 Actor 之間只能用消息進行通信,當一個 Actor 給另外一個 Actor 發消息,消息是有順序的(消息隊列),只需要將消息投寄的相應的郵箱即可

  • 怎么處理消息是由接收消息的 Actor 決定的,發送消息 Actor 可以等待回復,也可以異步處理[ajax]

  • ActorSystem 的職責是負責創建并管理其創建的 Actor, ActorSystem 是單例的[工廠模式],一個 JVM 進程中有一個即可,而 Actor 是可以有多個的

  • Actor 模型是對并發模型進行了更高的抽象

  • Actor 模型是異步、非阻塞、高性能的事件驅動編程模型 [案例:說明什么是異步、非阻塞,最經典的案例就是 ajax 異步請求處理 ]

  • Actor 模型是輕量級事件處理(1GB 內存可容納百萬級別個 Actor),因此處理大并發性能高

  • 4 Actor 模型工作機制說明

    4.1 示意圖

    4.2 Actor 模型工作機制說明(對照工作機制示意圖理解)

  • ActorySystem 創建 Actor

  • ActorRef:可以理解成是 Actor 的代理或者引用。消息是通過 ActorRef 來發送,而不能通過 Actor 發送消息,通過哪個 ActorRef 發消息,就表示把該消息發給哪個 Actor

  • 消息發送到 Dispatcher Message (消息分發器),它得到消息后,會將消息進行分發到對應的 MailBox。(注: Dispatcher Message 可以理解成是一個線程池, MailBox 可以理解成是消息隊列,可以緩沖多個消息,遵守 FIFO)

  • Actor 可以通過 receive 方法來獲取消息,然后進行處理。

  • 4.3 Actor 間傳遞消息機制(對照工作機制示意圖理解)

  • 每一個消息就是一個 Message 對象。Message 繼承了 Runable, 因為 Message 就是線程類。

  • 從 Actor 模型工作機制看上去很麻煩,但是程序員編程時只需要編寫 Actor 就可以了,其它的交給 Actor 模型完成即可。

  • A Actor 要給 B Actor 發送消息,那么 A Actor 要先拿到(也稱為持有) B Actor 的 代理對象 ActorRef 才能發送消息

  • 5 Actor 模型快速入門

    5.1 應用實例需求

  • 編寫一個 Actor,比如 SayHelloActor

  • SayHelloActor 可以給自己發送消息,如圖

  • 要求使用 Maven 的方式來構建項目,這樣可以很好的解決項目開發包的依賴關系。(AKKA 版本需要和 Scala 版本對應,使用 Maven 可以解決問題)

  • 代碼實現和說明

  • package com.test.akka.actorimport akka.actor.{Actor, ActorSystem, Props}class SayHelloActor extends Actor { //type Receive = PartialFunction[Any, Unit] override def receive: Receive = { case "start" => println("actor 開始運行...") case "hello" => println("hello too:)") case "fish" => println("<?)))><< 魚") case "cat" => println("(>^ω^<)喵..") //如何讓 actor 停止 case "exit" => { println("準備退出~~") context.stop(self) // 停止當前的 actor context.system.terminate() // 停止 ActorSystem. } }}object SayHelloActorDemo { def main(args: Array[String]): Unit = { //1 創建一個 ActorSystem val actorFactory = ActorSystem("actorFactory") //2.通過 actorFactory 創建需要的 actor //說明 //1. "SayHelloActor" 這個是 actor 的名字,有程序員指定. //2. Props[SayHelloActor] 是使用反射機制創建了 SayHelloActor 的實例 //3. sayHelloActorRef : 是創建的 SayHelloActor 的引用, 代理(proxy) val sayHelloActorRef = actorFactory.actorOf(Props[SayHelloActor],"SayHelloActor") sayHelloActorRef ! "start" sayHelloActorRef ! "hello" sayHelloActorRef ! "fish" sayHelloActorRef ! "cat" sayHelloActorRef ! "exit" }}

    5.2 對上面的代碼進行小結和說明

    6 Actor 模型應用實例-- Actor 間通訊

    6.1 應用實例需求

  • 編寫 2 個 Actor ,分別是 AActor 和 BActor

  • AActor 和 BActor 之間可以相互發送消息,

  • 加強對 Actor 傳遞消息機制的理解

  • 6.2 兩個 Actor 的通訊機制原理圖和思路分析

  • 先編寫 BActor,因為它會被 AActor 使用
  • AActor 先出招,BActor 收到消息后,通過 sender() ! "消息"
  • 6.3 代碼實現

    //AActor.scalapackage com.test.akka.actorsimport akka.actor.{Actor, ActorRef}//AActor 先出招class AActor(iBActorRef:ActorRef) extends Actor{ val bActorRef = iBActorRef var count = 0 override def receive: Receive = { case "start" => { println("AActor 啟動") println("stark ok") println("我打") //發給 BActor bActorRef ! "我打" } case "我打" => { count += 1 println(s"AActor(黃飛鴻) 挺猛 看我佛山無影腳 第${count}腳") Thread.sleep(1000) bActorRef ! "我打" } }} //BActor.scalapackage com.test.akka.actorsimport akka.actor.Actorclass BActor extends Actor{ var count = 0 override def receive:Receive = { case "我打" => { count += 1 println(s"BActor(喬峰) 厲害 看我降龍十八掌 第${count}掌") Thread.sleep(1000) sender() ! "我打" } }} //ActorGame.scalapackage com.test.akka.actorsimport akka.actor.{ActorRef, ActorSystem, Props}object ActorGame extends App{ //1. ActorSystme val actorfactory = ActorSystem("actorfactory") val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor],"BActor") val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "AActor") //做一個要求:當 100 招,就退出.. aActorRef ! "start"}

    8 Akka 網絡編程基本介紹

    Akka 支持面向大并發后端服務程序,網絡通信這塊是服務端程序重要的一部分。

    8.1 網絡編程有兩種:

  • TCP socket 編程,是網絡編程的主流。之所以叫 Tcp socket 編程,是因為底層是基于 tcp/ip 協議 的. 比如: QQ 聊天

  • b/s 結構的 http 編程,我們使用瀏覽器去訪問服務器時,使用的就是 http 協議,而 http 底層依舊是用 tcp socket 實現的。 比如: 京東商城 【屬于 web 編程范疇,核心的協議是 http,底層是 tcp/ip 協議 (協議簇)】

  • 8.2 OSI 與 tcp/ip 參考模型 (推薦 tcp/ip 協議 3 卷)

    推薦一部書,《tcp/ip 協議》和《Unix 高級編程》

    8.3 ip 地址

    概述:每個 internet 上的主機和路由器都有一個 ip 地址,它包括網絡號和主機號,ip 地址有 ipv4(32 位) 或者 ipv6(128 位),可以通過 ipconfig 來查看。

    8.4 端口 (port)--介紹

    我們這里所指的端口不是指物理意義上的端口,而是特指 TCP/IP 協議中的端口,是邏輯意義上的端口。如果把 IP 地址比作一間房子,端口就是出入這間房子的門。真正的房子只有幾個門,但是一個 IP 地址的端口 可以有 65535(即:256×256-1)個之多!端口是通過端口號來標記的。(端口號 0:保留 Reserved)

    8.5 端口(port)--分類

  • 0 號是保留端口

  • 1-1024 是固定端口 [有名端口/ 名花有主 ],即被某些程序固定使用,一般程序員不使用。

    22: SSH 遠程登錄協議 23: telnet 使用 21: ftp 使用 25: smtp 服務使用 80: iis 使用 7: echo 服務

  • 1025-65535 是動態端口 [純凈版,關閉不需要端口,sshd [改一個] ]

    這些端口,程序員可以使用 netstat -anb

  • 8.6 端口(port)-使用注意

  • 在計算機(尤其是做服務器)要盡可能的少開端口

  • 一個端口只能被一個程序監聽(80 但是一個端口可以連接多個客戶端)

  • 如果使用 netstat –an 可以查看本機有哪些端口在監聽

  • 可以使用 netstat –anb 來查看監聽端口的 pid,在結合任務管理器關閉不安全的端口

  • 8.7 網絡拓撲

    以下我們將 tcp socket 編程,簡稱 socket 編程。

    下圖為 socket 編程中客戶端和服務器的網絡分布

    9 Akka 網絡編程--小黃雞客服

    9.1 需求分析

  • 服務端進行監聽(9999)

  • 客戶端可以通過鍵盤輸入,發送咨詢問題給小黃雞客服(服務端)

  • 小黃雞(服務端) 回答客戶的問題

  • 9.2 界面設計

    9.3 程序的框架圖

    9.4 代碼實現

    MessageProtocol.scala

    package com.test.akka.yellowchickenserver.common1. mes 會稱為 樣例類的只讀屬性//ClientMessage 客戶端發送給服務器的協議數據(對象)case class ClientMessage(mes:String)//ServerMessage 服務端會送給客戶端的協議數據(對象)case class ServerMessage(mes:String)

    YellowChickenServer.scala

    package com.test.akka.yellowchickenserver.serverimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import com.test.akka.yellowchickenserver.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactoryclass YellowChickenServer extends Actor{ override def receive:Receive = { case "start" => { println("小黃 開始監聽程序,可以咨詢問題~~") } case ClientMessage(mes) => { //怎么匹配他的內容 println("客戶咨詢的問題是" + mes) mes match { case "大數據學費" => { sender() ! ServerMessage("20000RMB") } case "地址" => { sender() ! ServerMessage("北京昌平 XX 樓 111 號") } case "課程" => { sender() ! ServerMessage("JavaEE Python 前端 大數據") } case _ => { sender() ! ServerMessage("你說的啥子~~") } } } }}object YellowChickenServer extends App{ //創建 ActorSystem //因為這時,我們需要監聽網絡,所以使用如下方法創建工廠 //Config 就是我們的網絡配置 ip , port.. //def apply(name: String, config: Config): ActorSystem = apply(name, Option(config), None, None) val host = "127.0.0.1" //ip4 val port = 9999 //Config 就是我們的網絡配置 ip , port.. // val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) val serverActorSystem = ActorSystem("Server",config) val yellowChickenServerRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer],"YellowChickenServer") //akka.tcp://Server@127.0.0.1:9999 就是 Actor 路徑 yellowChickenServerRef ! "start"}

    CustomerActor.scala

    package com.test.akka.yellowchickenserver.clientimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.test.akka.yellowchickenserver.common.{ClientMessage, ServerMessage}import com.typesafe.config.ConfigFactoryimport scala.io.StdInclass CustomerActor extends Actor { //我們這里需要持有 Server 的 Ref var yellowChickenServerRef: ActorSelection = _ //preStart , 在啟動 Actor 之前會先運行,因此變量,初始化寫入 preStart override def preStart(): Unit = { //println("preStart") //說明 //1. 在 AKKA 的 Actor 模型中, 認為 每個 Actor 都是一個資源(角色),通過一個 Path 來定位一個 actor //2. path 的組成 akka.tcp://Server 的 actorfactory 名字@ServerIp:Server 的 port/user/ServerActor 名字 yellowChickenServerRef = context.actorSelection("akka.tcp://Server@127.0.0.1:9999/user/YellowChickenServer") } override def receive: Receive = { case "start" => { println("客戶端啟動,可以咨詢問題~~") } case mes: String => { //將 mes 發送給 Server yellowChickenServerRef ! ClientMessage(mes) } case ServerMessage(mes) => { println("收到小黃雞客服回復的消息: " + mes) } }}object CustomerActor extends App { //編寫必要的配置信息 val serverHost = "127.0.0.1" val serverPort = 9999 val clientHost = "127.0.0.1" val clientPort = 10000 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$clientHost |akka.remote.netty.tcp.port=$clientPort """.stripMargin) //創建 CustomerActor val clientActorSystem = ActorSystem("Client", config) val customerActorRef: ActorRef = clientActorSystem.actorOf(Props[CustomerActor], "CustomerActor") customerActorRef ! "start" println("可以咨詢問題了") while (true) { val mes = StdIn.readLine() customerActorRef ! mes //先發給自己,然后讓 CustomerActor 發 }}

    10 Spark Master Worker 進程通訊項目

    10.1 項目意義

  • 深入理解 Spark 的 Master 和 Worker 的通訊機制

  • 為了方便同學們看 Spark 的底層源碼,命名的方式和源碼保持一致。(如:通訊消息類命名就是一樣的)

  • 加深對主從服務心跳檢測機制(HeartBeat)的理解,方便以后 Spark 源碼二次開發。

  • 10.2 項目需求分析

  • worker 注冊到 Master,Master 完成注冊,并回復 worker 注冊成功(注冊功能)

  • worker 定時發送心跳(3),并在 Master 接收到

  • Master 接收到 worker 心跳后,要更新該 worker 的最近一次發送心跳的時間

  • 給 Master 啟動定時任務,定時檢測注冊的 worker 有哪些沒有更新心跳,并將其從 hashmap 中刪除

  • master worker 進行分布式部署(Linux 系統)->如何給 maven 項目打包->上傳 linux 并運行

  • 10.3 實現功能 1-Worker 完成注冊

    • 功能說明

      worker 注冊到 Master,Master 完成注冊,并回復 worker 注冊成功

    • 思路分析

    • 代碼實現

    //MessageProtocol.scalapackage com.test.akka.sparkmasterworker.common//樣例類, 注冊的協議,包含 id ,cpu, ram(內存)case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)//WorkerInfo 是用于保存 worker 信息的對象, 它不在網絡傳輸,他是普通類//后面會加入擴展內容,比如心跳時間class WorkerInfo(val id: String, val cpu: Int, val ram: Int) { //默認的心跳時間 var lastHeartBeatTime:Long = System.currentTimeMillis()}//如果注冊成功后,返回的協議信息,因為不需要屬性,因此我直接使用的 case object//后面直接返回的是 RegisteredWorkerInfo 對象: 類型 RegisteredWorkerInfo$case object RegisteredWorkerInfo //SparkMaster.scalapackage com.test.akka.sparkmasterworker.masterimport akka.actor.{Actor, ActorRef, ActorSystem, Props}import com.test.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}import com.typesafe.config.ConfigFactoryimport scala.collection.mutableclass SparkMaster extends Actor { //定義一個 hashMap,存放所有的 workers 信息 val workers = mutable.HashMap[String, WorkerInfo]() override def receive = { case "start" => { println("spark master 啟動,在 10000 監聽..") } case RegisterWorkerInfo(id, cpu, ram) => { //注冊 //先判斷是否已經有 id if (!workers.contains(id)) { //創建 WorkerInfo val workerInfo = new WorkerInfo(id, cpu, ram) workers += (id -> workerInfo) //workers += ((id,workerInfo)) //回復成功! sender() ! RegisteredWorkerInfo println(s"workerid= $id 完成注冊~") } } }}object SparkMaster extends App { val masterHost = "127.0.0.1" val masterPort = 10000 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$masterHost |akka.remote.netty.tcp.port=$masterPort """.stripMargin) //創建 ActorSystem // "SparkMaster" actorFactory 名字,程序指定 val sparkMasterActorSystem = ActorSystem("SparkMaster", config) //創建 SparkMaster 和 引用 val sparkMaster01Ref: ActorRef = sparkMasterActorSystem.actorOf(Props[SparkMaster], "SparkMaster01") sparkMaster01Ref ! "start"} //SparkWorker.scalapackage com.test.akka.sparkmasterworker.workerimport java.util.UUIDimport akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}import com.test.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo}import com.test.akka.sparkmasterworker.master.SparkMaster.{masterHost, masterPort}import com.typesafe.config.ConfigFactoryclass SparkWorker(masterHost:String,masterPort:Int) extends Actor{ var masterProxy: ActorSelection = _ val id = UUID.randomUUID().toString override def preStart(): Unit = { masterProxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/SparkMaster01") } override def receive = { case "start" => { println("spark worker 啟動..") //發出注冊的請求 masterProxy ! RegisterWorkerInfo(id, 8, 8 * 1024) } case RegisteredWorkerInfo => { println(s"收到 master 回復消息 workerid= $id 注冊成功") } }}object SparkWorker extends App{ val (masterHost,masterPort,workerHost,workerPort) = ("127.0.0.1",10000,"127.0.0.1",10001) val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$workerHost |akka.remote.netty.tcp.port=$workerPort """.stripMargin) val sparkWorkerActorSystem = ActorSystem("SparkWorker",config) val sparkWorkerActorRef: ActorRef = sparkWorkerActorSystem.actorOf(Props(new SparkWorker(masterHost, masterPort)), "SparkWorker-01") sparkWorkerActorRef ! "start"}

    10.4 實現功能 2-Worker 定時發送心跳

    • 功能說明

      worker 定時發送心跳給 Master,Master 能夠接收到,并更新 worker 上一次心跳時間

    • 代碼實現

    //MessageProtocol.scala//worker 在注冊成功后,通過定時器,每隔 3s 發送一個消息給自己//SendHeartBeatcase object SendHeartBeat//當定時器發送了一個 SendHeartBeat 消息后,worker 發送一個消息// (HearBeat(id: String))給 Mastercase class HeartBeat(id: String) //SparkWorker.scalaoverride def receive = { case "start" => { println("spark worker 啟動..") //發出注冊的請求 masterProxy ! RegisterWorkerInfo(id, 8, 8 * 1024) } case RegisteredWorkerInfo => { println(s"收到 master 回復消息 workerid= $id 注冊成功") //啟動一個定時器. import context.dispatcher //說明 //1.schedule 創建一個定時器 //2.0 millis, 延時多久才執行, 0 表示不延時,立即執行 //3. 3000 millis 表示每隔多長時間執行 3 秒 //4. self 給自己發送 消息 //5. SendHeartBeat 消息 context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat) } case SendHeartBeat => { println(s"workerid= $id 發出心跳~") masterProxy ! HeartBeat(id) } } //SparkMaster.scalacase HeartBeat(id) => { //更新 id 對應的 worker 的心跳 if (workers.contains(id)) { workers(id).lastHeartBeatTime = System.currentTimeMillis() println(s"workerid=$id 更新心跳成功~") } }

    10.5 實現功能 3-Master 啟動定時任務,定時檢測注冊的 worker

    • 功能說明

      功能要求:Master 啟動定時任務(10秒),定時檢測注冊的 worker 有哪些沒有更新心跳,已經超時的 worker(6 秒),將其從 hashmap 中刪除掉

    • 思路分析

    • 代碼實現

    //SparkMaster.scalaoverride def receive = { case "start" => { println("spark master 啟動,在 10000 監聽..") self ! StartTimeOutWorker } case RegisterWorkerInfo(id, cpu, ram) => { //注冊 //先判斷是否已經有 id if (!workers.contains(id)) { //創建 WorkerInfo val workerInfo = new WorkerInfo(id, cpu, ram) workers += (id -> workerInfo) //workers += ((id,workerInfo)) //回復成功! sender() ! RegisteredWorkerInfo println(s"workerid= $id 完成注冊~") } } case HeartBeat(id) => { //更新 id 對應的 worker 的心跳 if (workers.contains(id)) { workers(id).lastHeartBeatTime = System.currentTimeMillis() println(s"workerid=$id 更新心跳成功~") } } case StartTimeOutWorker =>{ //啟動定時器 import context.dispatcher context.system.scheduler.schedule(0 millis, 10000 millis, self, RemoveTimeOutWorker) } case RemoveTimeOutWorker => { //定時清理超時 6s 的 worker,scala //獲取當前的時間 val currentTime = System.currentTimeMillis() val workersInfo = workers.values //獲取到所有注冊的 worker 信息 //先將超時的一次性過濾出來,然后對過濾到的集合一次性刪除 workersInfo.filter( currentTime - _.lastHeartBeatTime > 6000 ).foreach(workerInfo=>{ workers.remove(workerInfo.id) }) printf("當前有%d 個 worker 存活\n", workers.size) } }

    10.6 實現功能 4-Master,Worker 的啟動參數運行時指定

    • 功能說明

      功能要求:Master,Worker 的啟動參數運行時指定,而不是固定寫在程序中的

    • 代碼實現

    object SparkMaster extends App { //要求啟動時,我們從外部輸入三個參數 if (args.length != 3) { println("啟動參數不正確 <masterHost masterPort masterName>") } val masterHost = args(0) val masterPort = args(1) val masterName = args(2) //SparkWorker.scala if (args.length != 6) { println("參數格式不正確 <masterHost masterPort masterName workerHost workerPort workerName>") } val (masterHost,masterPort,masterName,workerHost,workerPort,workerName) = (args(0),args(1),args(2),args(3),args(4),args(5))

    10.7 對開發的 SparkMaster 和 SparkWorker 打包.jar , 部署到不同的 Linux 服務器,并運行

    我這里直接在 windows 演示,同學們可以上傳到自己的 3 臺 linux 并并行

    打包的步驟

  • 修改 pom.xml 指定主類

    com.test.akka.sparkmasterworker.master.SparkMaster

  • 出 maven 的打包的界面

  • 找到 lifecycle,雙擊 package 即可

  • 打包后,到 target 去找 jar 即可

  • 給 SparkWorker 打包的流程和前面完全一樣,但是需要先 clean

  • 測試和指令java -jar SparkWorker.java 127.0.0.1 10000 SparkMaster01 127.0.0.1 10001 SparkWorker01

  • 閱讀全文: http://gitbook.cn/gitchat/activity/5de88c119a74cc327f167a48

    您還可以下載 CSDN 旗下精品原創內容社區 GitChat App , GitChat 專享技術內容哦。

    總結

    以上是生活随笔為你收集整理的AKKA:大数据下的并发编程模型的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。