日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

Scala消息通信之akka,akka案例

發(fā)布時(shí)間:2024/9/27 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Scala消息通信之akka,akka案例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

#Scala編程實(shí)戰(zhàn)
##一、 課程目標(biāo)
###1. 目標(biāo):熟練使用Scala編寫程序

##二、 項(xiàng)目概述
###1. 需求
目前大多數(shù)的分布式架構(gòu)底層通信都是通過RPC實(shí)現(xiàn)的,RPC框架非常多,比如前我們學(xué)過的Hadoop項(xiàng)目的RPC通信框架,但是Hadoop在設(shè)計(jì)之初就是為了運(yùn)行長達(dá)數(shù)小時(shí)的批量而設(shè)計(jì)的,在某些極端的情況下,任務(wù)提交的延遲很高,所有Hadoop的RPC顯得有些笨重。

Spark 的RPC是通過Akka類庫實(shí)現(xiàn)的,Akka用Scala語言開發(fā),基于Actor并發(fā)模型實(shí)現(xiàn),Akka具有高可靠、高性能、可擴(kuò)展等特點(diǎn),使用Akka可以輕松實(shí)現(xiàn)分布式RPC功能。
###2. Akka簡介
Akka基于Actor模型,提供了一個(gè)用于構(gòu)建可擴(kuò)展的(Scalable)、彈性的(Resilient)、快速響應(yīng)的(Responsive)應(yīng)用程序的平臺(tái)。

**Actor模型:**在計(jì)算機(jī)科學(xué)領(lǐng)域,Actor模型是一個(gè)并行計(jì)算(Concurrent Computation)模型,它把a(bǔ)ctor作為并行計(jì)算的基本元素來對待:為響應(yīng)一個(gè)接收到的消息,一個(gè)actor能夠自己做出一些決策,如創(chuàng)建更多的actor,或發(fā)送更多的消息,或者確定如何去響應(yīng)接收到的下一個(gè)消息。

Actor是Akka中最核心的概念,它是一個(gè)封裝了狀態(tài)和行為的對象,Actor之間可以通過交換消息的方式進(jìn)行通信,每個(gè)Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發(fā)出正確地并發(fā)程序和并行系統(tǒng),Actor具有如下特性:

**1.**提供了一種高級(jí)抽象,能夠簡化在并發(fā)(Concurrency)/并行(Parallelism)應(yīng)用場景下的編程開發(fā)
**2.**提供了異步非阻塞的、高性能的事件驅(qū)動(dòng)編程模型
**3.**超級(jí)輕量級(jí)事件處理(每GB堆內(nèi)存幾百萬Actor)

##案例介紹:
知識(shí)點(diǎn)說明:
1.Akka可以實(shí)現(xiàn)不同進(jìn)程之間的通信
2.老大叫ActorSystem,用于創(chuàng)建和監(jiān)控Acotr
3.真正用于通信的是Acotr
4.一個(gè)進(jìn)程里面可以有多個(gè)Acotor
5.可以有多個(gè)進(jìn)程
6.如果不同ActorSystem下面的Acotor要進(jìn)行通信,首先AcotorSystem之間要建立連接

在akka的程序中,需要服務(wù)端和客戶端。
創(chuàng)建一個(gè)maven項(xiàng)目,其中maven項(xiàng)目的pom.xml內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.akka</groupId><artifactId>MyRPC</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><scala.compat.version>2.10</scala.compat.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-actor_2.10</artifactId><version>2.3.14</version></dependency><dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-remote_2.10</artifactId><version>2.3.14</version></dependency></dependencies><build><!-- 源碼包放置的位置 --><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><!--專門用于編譯scala的插件--><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><!--主要使用來的打包的插件--><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><!--表示把下面的這些文件給刪掉--><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>cn.toto.akka.Worker</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build> </project>

其中服務(wù)端的代碼如下:

package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory/*** Created by toto on 2017/7/2.*/ class Master extends Actor {override def receive: Receive = {//里面要有一個(gè)偏函數(shù)case "start" => {println("starting...")println("started....")}case "stop" => {println("stoping...")println("stopted...");}//master接收到worker的消息case "connect" => {println("a client connected...")//這里表示向客戶端發(fā)送一個(gè)消息sender ! "success"}case _ => println("123")} }object Master {def main(args: Array[String]): Unit = {//通過s,可以將變量通過$取到val host = "127.0.0.1"val port = "8888"val confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMargin//讀取一些默認(rèn)的配置文件val conf = ConfigFactory.parseString(confStr)//ActorSystem單例的,用于創(chuàng)建Actor并監(jiān)控actorval actorSystem = ActorSystem("MasterActorSystem",conf)//通過ActorSystem創(chuàng)建Actor,可以通過Ctrl + P的方式看到這個(gè)方法里面可以有哪些參數(shù)//通過Master類型,反射創(chuàng)建實(shí)例var master = actorSystem.actorOf(Props[Master],"Master")//接著可以發(fā)消息了。后續(xù)這里可以不是發(fā)字符串,可以發(fā)case classmaster ! "start"master ! "hello"master ! "stop"actorSystem.awaitTermination()} }

另外,worker端的代碼如下:

package cn.toto.akkaimport akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory/*** Created by toto on 2017/7/2.*/ class Worker extends Actor{//actor里面有聲明周期方法//preStart在構(gòu)造器之后receive之前執(zhí)行override def preStart(): Unit = {//首先跟Master建立連接,其中"akka.tcp://MasterActorSystem@127.0.0.1:8888"可以在Master運(yùn)行之后的控制臺(tái)中找到//下面的地址:"akka.tcp://MasterActorSystem@127.0.0.1:8888"表示要先連接actorSystem,接著要和它下面的/user/Master建立通信//這樣就相當(dāng)于拿到了master的代理對象val master = context.actorSelection("akka.tcp://MasterActorSystem@127.0.0.1:8888/user/Master")//通過mater的引用向Master發(fā)送消息//向master發(fā)送connect消息master ! "connect"}override def receive: Receive = {case "success" => {println("a msg form master:success")}} }object Worker {def main(args: Array[String]): Unit = {//通過s,可以將變量通過$取到//val host = args(0)//val port = args(1).toIntval host = "127.0.0.1"val port = "9999"val confStr =s"""|akka.actor.provider = "akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname = "$host"|akka.remote.netty.tcp.port = "$port"""".stripMarginval conf = ConfigFactory.parseString(confStr)//單例的ActorSystemval actorSystem = ActorSystem("WorkerActorSystem",conf)//通過actorSystem來創(chuàng)建actorval worker = actorSystem.actorOf(Props[Worker],"Worker")//這里是等待優(yōu)雅退出actorSystem.awaitTermination()} }

運(yùn)行過程:
1、先啟動(dòng)Master(右鍵run),運(yùn)行后的效果如下:

2、接著運(yùn)行worker(右鍵run),運(yùn)行后的效果如下:

總結(jié)

以上是生活随笔為你收集整理的Scala消息通信之akka,akka案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。