生活随笔
收集整理的這篇文章主要介紹了
讨喜的隔离可变性(五)同时使用多个角色
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
通過前面的學習,我們已經了解了如何創建角色以及如何給角色發送消息,下面讓我們來一起學習如何讓多個角色協同工作。在第2章中,我們創建了一個統計給定區間內所有素數的程序。在該程序中,我們使用了ExecutorService、Callable、Future以及其他差不多超過一頁紙那么多代碼。本節我們將會學習如何用Akka角色對該示例進行重構,并且根據之前的慣例我們的介紹順序還是先Java后Scala。
在Java中同時使用多個角色
假定待統計數字集合中的數字是1千萬個,為了統計其中的素數數量,之前我們是將數字集合劃分為若干個不相交的子集合,并將這些子集合丟給一些線程去執行統計操作。但這里我們將使用角色來完成同樣的功能,下面就讓我們從角色的onRecevie()函數開始說起吧:
| 1 | public class Primes extends UntypedActor { |
| 2 | public void onReceive(final Object boundsList) { |
| 3 | final List<Integer> bounds = (List<Integer>) boundsList; |
| 5 | PrimeFinder.countPrimesInRange(bounds.get(0), bounds.get(1)); |
| 6 | getContext().replySafe(count); |
為了統計給定區間內的素數數量,我們需要指定區間的上下限。在本例中,onReceive()函數的參數是一個List,其中前兩個元素即為區間的上下限。在onReceive()函數內部,我們調用了PrimeFinder類的countPrimesInRage()函數來統計區間內的素數數量,最后又使用replySafe()函數將統計結果返回給調用者。
在給定了待統計的數字集合之后,我們需要將其劃分成若干個不相交的子集合并將統計這些子集合中素數數量的任務委托給各個不同的角色來執行。下面就讓我們在靜態方法countPrimes()中實現這些邏輯:
| 01 | public static int countPrimes( |
| 02 | final int number, final int numberOfParts) { |
| 03 | final int chunksPerPartition = number / numberOfParts; |
| 04 | final List<Future<?>> results = new ArrayList<Future<?>>(); |
| 05 | for(int index = 0; index < numberOfParts; index++) { |
| 06 | final int lower = index * chunksPerPartition + 1; |
| 07 | final int upper = (index == numberOfParts - 1) ? number : |
| 08 | lower + chunksPerPartition - 1; |
| 09 | final List<Integer> bounds = Collections.unmodifiableList( |
| 10 | Arrays.asList(lower, upper)); |
| 11 | final ActorRef primeFinder = Actors.actorOf(Primes.class).start(); |
| 12 | results.add(primeFinder.sendRequestReplyFuture(bounds)); |
| 15 | for(Future<?> result : results) |
| 16 | count += (Integer)(result.await().result().get()); |
| 17 | Actors.registry().shutdownAll(); |
在確定了每個子集合的范圍之后,我們會將其包裝在一個不可變集合里——請記住,所有的消息都必須是不可變的。接下來,我們調用sendRequestReplyFuture()這個非阻塞函數來將統計請求發送給各個角色進行處理。在把請求發送出去之后,我們將sendRequestReplyFuture()返回的Future對象(注意這里是akka.dispatch.Future而不是JDK中的java.util.concurrent.Future)保存在一個數組中以便稍后從其中取回各個子集合的統計結果。在任務分派完畢之后,我們就可以循環查詢每個Future,即先調用Future的await()函數,待await()函數返回之后再調用其返回值的result()函數來獲取一個Scala的Option實例——你可以將其假想為一個包含統計結果的數據單元(如果數據存在的話)。最后我們可以通過調用該實例對象的get()函數來得到一個Integer類型的統計值。
OK,下面就讓我們寫一個用來檢驗上述代碼的測試用例,其中的待統計數字和子集合劃分數是通過命令行傳給程序的:
| 01 | public static void main(final String[] args) { |
| 03 | System.out.println("Usage: number numberOfParts"); |
| 05 | final long start = System.nanoTime(); |
| 06 | final int count = countPrimes( |
| 07 | Integer.parseInt(args[0]), Integer.parseInt(args[1])); |
| 08 | Working with Multiple Actors ? 179 |
| 09 | final long end = System.nanoTime(); |
| 10 | System.out.println("Number of primes is " + count); |
| 11 | System.out.println("Time taken " + (end - start)/1.0e9); |
main()函數主要負責對上面的統計代碼進行測試并記錄執行耗時。最后我們還需要實現PrimeFinder這個真正負責統計工作的類:
| 01 | public class PrimeFinder { |
| 02 | public static boolean isPrime(final int number) { |
| 03 | if (number <= 1) return false; |
| 04 | final int limit = (int) Math.sqrt(number); |
| 05 | for(int i = 2; i <= limit; i++) if(number % i == 0) return false; |
| 08 | public static int countPrimesInRange(final int lower, final int upper) { |
| 10 | for(int index = lower; index <= upper; index++) |
| 11 | if(isPrime(index)) count += 1; |
令待統計區間為[1, 1000w]、劃分的子區間為100個,則上述示例程序的輸出結果如下所示:
| 1 | Number of primes is 664579 |
下面讓我們將本節的代碼和輸出結果與第2.4節的示例代碼和輸出結果進行比較。雖然兩個版本都將子集合數設為100,但Akka版本的示例代碼無需顯式設定線程池大小。此外,由于這是一個計算密集型問題,所以對于使用ExecutorService的版本而言,其線程池大小的設定是需要隨機器CPU核數計算而定的,所以兩個版本的性能都差不多,而Akka版本在代碼的形式上要比使用ExecutorServer的版本簡潔一些。但正如我們在本章后面將會看到的那樣,當我們需要讓多個線程/角色相互協作的時候,這些區別將會愈發明顯。
在Scala中同時使用多角色
如果用Scala來實現這個統計素數數量的程序,那么我們就可以深切體會到Scala在角色的實現以及與角色交互方面的簡潔和優雅。下面讓我們來看看Scala版本的Primes類是如何實現的:
| 01 | class Primes extends Actor { |
| 03 | case (lower : Int, upper : Int) => |
| 04 | val count = PrimeFinder.countPrimesInRange(lower, upper) |
| 05 | self.replySafe(new Integer(count)) |
| 09 | def countPrimes(number : Int, numberOfParts : Int) = { |
| 10 | val chunksPerPartition : Int = number / numberOfParts |
| 11 | val results = new Array[Future[Integer]](numberOfParts) |
| 13 | while(index < numberOfParts) { |
| 14 | val lower = index * chunksPerPartition + 1 |
| 15 | val upper = if (index == numberOfParts - 1) |
| 16 | number else lower + chunksPerPartition - 1 |
| 17 | val bounds = (lower, upper) |
| 18 | val primeFinder = Actor.actorOf[Primes].start() |
| 19 | results(index) = (primeFinder !!! bounds).asInstanceOf[Future[Integer]] |
| 24 | while(index < numberOfParts) { |
| 25 | count += results(index).await.result.get.intValue() |
| 28 | Actors.registry.shutdownAll |
| 31 | def main(args : Array[String]) : Unit = { |
| 33 | println("Usage: number numberOfParts") |
| 35 | val start = System.nanoTime |
| 36 | val count = countPrimes(args(0).toInt, args(1).toInt) |
| 37 | val end = System.nanoTime |
| 38 | println("Number of primes is " + count) |
| 39 | println("Time taken " + (end - start)/1.0e9) |
Scala版本的代碼與Java版本有幾點不同。首先,Scala版本所使用的消息格式是簡單的元組而不是一個不可變列表。其次,receive()函數中的case語句與應用場景十分契合。第三,Java版本中countPrimes()函數里的for循環在這里變成了一個while循環。其原因是,雖然Scala的for循環表達式十分優雅,但會增加Object到基本類型之間的轉換開銷。為了能夠得到比較真實的性能對比,我在這里放棄了優雅。
類似地,在PrimeFinder中,我們也用while循環代替了for循環。
| 02 | def isPrime(number : Int) : Boolean = { |
| 03 | if (number <= 1) return false |
| 04 | var limit = scala.math.sqrt(number).toInt |
| 07 | if(number % i == 0) return false |
| 12 | def countPrimesInRange(lower : Int, upper : Int) : Int = { |
| 15 | while(index <= upper) { |
| 16 | if(isPrime(index)) count += 1 |
令待統計區間為[1,1000w]、劃分的子區間為100個,則Scala版示例程序的性能如下所示:
| 1 | Number of primes is 664579 |
總結
以上是生活随笔為你收集整理的讨喜的隔离可变性(五)同时使用多个角色的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。