并行化:你的高并发大杀器
來源:咖啡拿鐵
1.前言
想必熱愛游戲的同學小時候,都幻想過要是自己要是能像鳴人那樣會多重影分身之術,就能一邊打游戲一邊上課了,可惜漫畫就是漫畫,現實中并沒有這個技術,你要么只有老老實實的上課,要么就只有逃課去打游戲了。雖然在現實中我們無法實現多重影分身這樣的技術,但是我們可以在計算機世界中實現我們這樣的愿望。
2.計算機中的分身術
計算機中的分身術不是天生就有了。在1971年,1971年,英特爾推出的全球第一顆通用型微處理器4004,由2300個晶體管構成。當時,公司的聯合創始人之一戈登摩爾就提出大名鼎鼎的“摩爾定律”——每過18個月,芯片上可以集成的晶體管數目將增加一倍。最初的主頻740kHz(每秒運行74萬次),現在過了快50年了,大家去買電腦的時候會發現現在的主頻都能達到4.0GHZ了(每秒40億次)。
但是主頻越高帶來的收益卻是越來越小:
據測算,主頻每增加1G,功耗將上升25瓦,而在芯片功耗超過150瓦后,現有的風冷散熱系統將無法滿足散熱的需要。有部分CPU都可以用來煎雞蛋了。
流水線過長,使得單位頻率效能低下,越大的主頻其實整體性能反而不如小的主頻。
戈登摩爾認為摩爾定律未來10-20年會失效。
在單核主頻遇到瓶頸的情況下,多核CPU應運而生,不僅提升了性能,并且降低了功耗。所以多核CPU逐漸成為現在市場的主流,這樣讓我們的多線程編程也更加的容易。
說到了多核CPU就一定要說GPU,大家可能對這個比較陌生,但是一說到顯卡就肯定不陌生,筆者搞過一段時間的CUDA編程,我才意識到這個才是真正的并行計算,大家都知道圖片像素點吧,比如19201080的圖片有210萬個像素點,如果想要把一張圖片的每個像素點都進行轉換一下,那在我們java里面可能就要循環遍歷210萬次。 就算我們用多線程8核CPU,那也得循環幾十萬次。但是如果使用Cuda,最多可以365535*512=100661760(一億)個線程并行執行,就這種級別的圖片那也是馬上處理完成。但是Cuda一般適合于圖片這種,有大量的像素點需要同時處理,但是指令集很少所以邏輯不能太復雜。
GPU只是用來擴展介紹,感興趣可以和筆者交流。
3.應用中的并行
一說起讓你的服務高性能的手段,那么異步化,并行化這些肯定會第一時間在你腦海中顯現出來,在之前的文章:《異步化,你的高并發大殺器》中已經介紹過了異步化的優化手段,有興趣的朋友可以看看。并行化可以用來配合異步化,也可以用來單獨做優化。
我們可以想想有這么一個需求,在你下外賣訂單的時候,這筆訂單可能還需要查,用戶信息,折扣信息,商家信息,菜品信息等,用同步的方式調用,如下圖所示:
設想一下這5個查詢服務,平均每次消耗50ms,那么本次調用至少是250ms,我們細想一下,在這個這五個服務其實并沒有任何的依賴,誰先獲取誰后獲取都可以,那么我們可以想想,是否可以用多重影分身之術,同時獲取這五個服務的信息呢?
優化如下:
將這五個查詢服務并行查詢,在理想情況下可以優化至50ms。當然說起來簡單,我們真正如何落地呢?
3.1 CountDownLatch/Phaser
CountDownLatch和Phaser是JDK提供的同步工具類Phaser是1.7版本之后提供的工具類而CountDownLatch是1.5版本之后提供的工具類。這里簡單介紹一下CountDownLatch,可以將其看成是一個計數器,await()方法可以阻塞至超時或者計數器減至0,其他線程當完成自己目標的時候可以減少1,利用這個機制我們可以將其用來做并發。?
可以用如下的代碼實現我們上面的下訂單的需求:
public?class?CountDownTask?{????private?static?final?int?CORE_POOL_SIZE?=?4;
????private?static?final?int?MAX_POOL_SIZE?=?12;
????private?static?final?long?KEEP_ALIVE_TIME?=?5L;
????private?final?static?int?QUEUE_SIZE?=?1600;
????protected?final?static?ExecutorService?THREAD_POOL?=?new?ThreadPoolExecutor(CORE_POOL_SIZE,?MAX_POOL_SIZE,
????????????KEEP_ALIVE_TIME,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(QUEUE_SIZE));
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????//?新建一個為5的計數器
????????CountDownLatch?countDownLatch?=?new?CountDownLatch(5);
????????OrderInfo?orderInfo?=?new?OrderInfo();
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當前任務Customer,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setCustomerInfo(new?CustomerInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當前任務Discount,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setDiscountInfo(new?DiscountInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當前任務Food,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setFoodListInfo(new?FoodListInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當前任務Tenant,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setTenantInfo(new?TenantInfo());
????????????countDownLatch.countDown();
????????});
????????THREAD_POOL.execute(()?->?{
????????????System.out.println("當前任務OtherInfo,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setOtherInfo(new?OtherInfo());
????????????countDownLatch.countDown();
????????});
????????countDownLatch.await(1,?TimeUnit.SECONDS);
????????System.out.println("主線程:"+?Thread.currentThread().getName());
????}
}
建立一個線程池(具體配置根據具體業務,具體機器配置),進行并發的執行我們的任務(生成用戶信息,菜品信息等),最后利用await方法阻塞等待結果成功返回。
3.2CompletableFuture
相信各位同學已經發現,CountDownLatch雖然能實現我們需要滿足的功能但是其任然有個問題是,在我們的業務代碼需要耦合CountDownLatch的代碼,比如在我們獲取用戶信息之后我們會執行countDownLatch.countDown(),很明顯我們的業務代碼顯然不應該關心這一部分邏輯,并且在開發的過程中萬一寫漏了,那我們的await方法將只會被各種異常喚醒。
所以在JDK1.8中提供了一個類CompletableFuture,它是一個多功能的非阻塞的Future。(什么是Future:用來代表異步結果,并且提供了檢查計算完成,等待完成,檢索結果完成等方法。)在我之前的這篇文章中詳細介紹了《異步技巧之CompletableFuture》,有興趣的可以看這篇文章。
我們將每個任務的計算完成的結果都用CompletableFuture來表示,利用CompletableFuture.allOf匯聚成一個大的CompletableFuture,那么利用get()方法就可以阻塞。
public?class?CompletableFutureParallel?{????private?static?final?int?CORE_POOL_SIZE?=?4;
????private?static?final?int?MAX_POOL_SIZE?=?12;
????private?static?final?long?KEEP_ALIVE_TIME?=?5L;
????private?final?static?int?QUEUE_SIZE?=?1600;
????protected?final?static?ExecutorService?THREAD_POOL?=?new?ThreadPoolExecutor(CORE_POOL_SIZE,?MAX_POOL_SIZE,
????????????KEEP_ALIVE_TIME,?TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(QUEUE_SIZE));
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException,?TimeoutException?{
????????OrderInfo?orderInfo?=?new?OrderInfo();
????????//CompletableFuture?的List
????????List<CompletableFuture>?futures?=?new?ArrayList<>();
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當前任務Customer,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setCustomerInfo(new?CustomerInfo());
????????},?THREAD_POOL));
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當前任務Discount,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setDiscountInfo(new?DiscountInfo());
????????},?THREAD_POOL));
????????futures.add(?CompletableFuture.runAsync(()?->?{
????????????System.out.println("當前任務Food,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setFoodListInfo(new?FoodListInfo());
????????},?THREAD_POOL));
????????futures.add(CompletableFuture.runAsync(()?->?{
????????????System.out.println("當前任務Other,線程名字為:"?+?Thread.currentThread().getName());
????????????orderInfo.setOtherInfo(new?OtherInfo());
????????},?THREAD_POOL));
????????CompletableFuture?allDoneFuture?=?CompletableFuture.allOf(futures.toArray(new?CompletableFuture[futures.size()]));
????????allDoneFuture.get(10,?TimeUnit.SECONDS);
????????System.out.println(orderInfo);
????}
}
可以看見我們使用CompletableFuture能很快的完成的需求,當然這還不夠。
3.3 Fork/Join
我們上面用CompletableFuture完成了我們對多組任務并行執行,但是其依然是依賴我們的線程池,在我們的線程池中使用的是阻塞隊列,也就是當我們某個線程執行完任務的時候需要通過這個阻塞隊列進行,那么肯定會發生競爭,所以在JDK1.7中提供了ForkJoinTask和ForkJoinPool。
ForkJoinPool中每個線程都有自己的工作隊列,并且采用Work-Steal算法防止線程饑餓。 Worker線程用LIFO的方法取出任務,但是會用FIFO的方法去偷取別人隊列的任務,這樣就減少了鎖的沖突。
網上這個框架的例子很多,我們看看如何使用代碼其完成我們上面的下訂單需求:
public?class?OrderTask?extends?RecursiveTask<OrderInfo>?{????@Override
????protected?OrderInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????//?定義其他五種并行TasK
????????CustomerTask?customerTask?=?new?CustomerTask();
????????TenantTask?tenantTask?=?new?TenantTask();
????????DiscountTask?discountTask?=?new?DiscountTask();
????????FoodTask?foodTask?=?new?FoodTask();
????????OtherTask?otherTask?=?new?OtherTask();
????????invokeAll(customerTask,?tenantTask,?discountTask,?foodTask,?otherTask);
????????OrderInfo?orderInfo?=?new?OrderInfo(customerTask.join(),?tenantTask.join(),?discountTask.join(),?foodTask.join(),?otherTask.join());
????????return?orderInfo;
????}
????public?static?void?main(String[]?args)?{
????????ForkJoinPool?forkJoinPool?=?new?ForkJoinPool(Runtime.getRuntime().availableProcessors()?-1?);
????????System.out.println(forkJoinPool.invoke(new?OrderTask()));
????}
}
class?CustomerTask?extends?RecursiveTask<CustomerInfo>{
????@Override
????protected?CustomerInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?CustomerInfo();
????}
}
class?TenantTask?extends?RecursiveTask<TenantInfo>{
????@Override
????protected?TenantInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?TenantInfo();
????}
}
class?DiscountTask?extends?RecursiveTask<DiscountInfo>{
????@Override
????protected?DiscountInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?DiscountInfo();
????}
}
class?FoodTask?extends?RecursiveTask<FoodListInfo>{
????@Override
????protected?FoodListInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?FoodListInfo();
????}
}
class?OtherTask?extends?RecursiveTask<OtherInfo>{
????@Override
????protected?OtherInfo?compute()?{
????????System.out.println("執行"+?this.getClass().getSimpleName()?+?"線程名字為:"?+?Thread.currentThread().getName());
????????return?new?OtherInfo();
????}
}
我們定義一個OrderTask并且定義五個獲取信息的任務,在compute中分別fork執行這五個任務,最后在將這五個任務的結果通過Join獲得,最后完成我們的并行化的需求。
3.4 parallelStream
在jdk1.8中提供了并行流的API,當我們使用集合的時候能很好的進行并行處理,下面舉了一個簡單的例子從1加到100:
public?class?ParallelStream?{????public?static?void?main(String[]?args)?{
????????ArrayList<Integer>?list?=?new?ArrayList<Integer>();
????????for?(int?i?=?1;?i?<=?100;?i++)?{
????????????list.add(i);
????????}
????????LongAdder?sum?=?new?LongAdder();
????????list.parallelStream().forEach(integer?->?{
//????????????System.out.println("當前線程"?+?Thread.currentThread().getName());
????????????sum.add(integer);
????????});
????????System.out.println(sum);
????}
}
parallelStream中底層使用的那一套也是Fork/Join的那一套,默認的并發程度是可用CPU數-1。
3.5 分片
可以想象有這么一個需求,每天定時對id在某個范圍之間的用戶發券,比如這個范圍之間的用戶有幾百萬,如果給一臺機器發的話,可能全部發完需要很久的時間,所以分布式調度框架比如:elastic-job都提供了分片的功能,比如你用50臺機器,那么id%50=0的在第0臺機器上,=1的在第1臺機器上發券,那么我們的執行時間其實就分攤到了不同的機器上了。
4.并行化注意事項
線程安全:在parallelStream中我們列舉的代碼中使用的是LongAdder,并沒有直接使用我們的Integer和Long,這個是因為在多線程環境下Integer和Long線程不安全。所以線程安全我們需要特別注意。
合理參數配置:可以看見我們需要配置的參數比較多,比如我們的線程池的大小,等待隊列大小,并行度大小以及我們的等待超時時間等等,我們都需要根據自己的業務不斷的調優防止出現隊列不夠用或者超時時間不合理等等。
5.最后
本文介紹了什么是并行化,并行化的各種歷史,在Java中如何實現并行化,以及并行化的注意事項。希望大家對并行化有個比較全面的認識。最后給大家提個兩個小問題:
在我們并行化當中有某個任務如果某個任務出現了異常應該怎么辦?
在我們并行化當中有某個任務的信息并不是強依賴,也就是如果出現了問題這部分信息我們也可以不需要,當并行化的時候,這種任務出現了異常應該怎么辦?
-END-
?近期熱文:
重磅:Elasticsearch上市!市值近50億美元
利用SPRING管理熱加載的GROOVY對象!
Spring Boot中如何擴展XML請求和響應的支持
Java 11正式發布,新特性解讀
系統優化總結—系統層面
NIO相關基礎篇
以Dubbo為例,聊聊如何為開源項目做貢獻
25個面試中最常問的問題和答案
關注我
點擊“閱讀原文”,看本號其他精彩內容
總結
以上是生活随笔為你收集整理的并行化:你的高并发大杀器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 脉冲耦合神经网络(PCNN)-pulse
- 下一篇: 测试鼠标宏软件,KINBAS VP900