日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java多核并行计算_谈谈Java任务的并行处理

發布時間:2023/12/15 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java多核并行计算_谈谈Java任务的并行处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

談到并行,我們可能最先想到的是線程,多個線程一起運行,來提高我們系統的整體處理速度;為什么使用多個線程就能提高處理速度,因為現在計算機普遍都是多核處理器,我們需要充分利用cpu資源;如果站的更高一點來看,我們每臺機器都可以是一個處理節點,多臺機器并行處理;并行的處理方式可以說無處不在,本文主要來談談Java在并行處理方面的努力。

無處不在的并行

Java的垃圾回收器,我們可以看到每一代版本的更新,伴隨著GC更短的延遲,從serial到cms再到現在的G1,一直在摘掉Java慢的帽子;消息隊列從早期的ActiveMQ到現在的kafka和RocketMQ,引入的分區的概念,提高了消息的并行性;數據庫單表數據到一定量級之后,訪問速度會很慢,我們會對表進行分表處理,引入數據庫中間件;Redis你可能覺得本身處理是單線程的,但是Redis的集群方案中引入了slot(槽)的概念;更普遍的就是我們很多的業務系統,通常會部署多臺,通過負載均衡器來進行分發;好了還有其他的一些例子,此處不在一一例舉。

如何并行

我覺得并行的核心在于"拆分",把大任務變成小任務,然后利用多核CPU也好,還是多節點也好,同時并行的處理,Java歷代版本的更新,都在為我們開發者提供更方便的并行處理,從開始的Thread,到線程池,再到fork/join框架,最后到流處理,下面使用簡單的求和例子來看看各種方式是如何并行處理的;

單線程處理

首先看一下最簡單的單線程處理方式,直接使用主線程進行求和操作;

public class SingleThread {

public static long[] numbers;

public static void main(String[] args) {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

long sum = 0;

for (int i = 0; i < numbers.length; i++) {

sum += numbers[i];

}

System.out.println("sum = " + sum);

}

}

求和本身是一個計算密集型任務,但是現在已經是多核時代,只用單線程,相當于只使用了其中一個cpu,其他cpu被閑置,資源的浪費;

Thread方式

我們把任務拆分成多個小任務,然后每個小任務分別啟動一個線程,如下所示:

public class ThreadTest {

public static final int THRESHOLD = 10_000;

public static long[] numbers;

private static long allSum;

public static void main(String[] args) throws Exception {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

int taskSize = (int) (numbers.length / THRESHOLD);

for (int i = 1; i <= taskSize; i++) {

final int key = i;

new Thread(new Runnable() {

public void run() {

sumAll(sum((key - 1) * THRESHOLD, key * THRESHOLD));

}

}).start();

}

Thread.sleep(100);

System.out.println("allSum = " + getAllSum());

}

private static synchronized long sumAll(long threadSum) {

return allSum += threadSum;

}

public static synchronized long getAllSum() {

return allSum;

}

private static long sum(int start, int end) {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

}

以上指定了一個拆分閥值,計算拆分多少個認為,同時啟動多少線程;這種處理就是啟動的線程數過多,而CPU數有限,更重要的是求和是一個計算密集型任務,啟動過多的線程只會帶來更多的線程上下文切換;同時線程處理完一個任務就終止了,也是對資源的浪費;另外可以看到主線程不知道何時子任務已經處理完了,需要做額外的處理;所有Java后續引入了線程池。

線程池方式

jdk1.5引入了并發包,其中包括了ThreadPoolExecutor,相關代碼如下:

public class ExecutorServiceTest {

public static final int THRESHOLD = 10_000;

public static long[] numbers;

public static void main(String[] args) throws Exception {

numbers = LongStream.rangeClosed(1, 10_000_000).toArray();

ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);

CompletionService completionService = new ExecutorCompletionService(executor);

int taskSize = (int) (numbers.length / THRESHOLD);

for (int i = 1; i <= taskSize; i++) {

final int key = i;

completionService.submit(new Callable() {

@Override

public Long call() throws Exception {

return sum((key - 1) * THRESHOLD, key * THRESHOLD);

}

});

}

long sumValue = 0;

for (int i = 0; i < taskSize; i++) {

sumValue += completionService.take().get();

}

// 所有任務已經完成,關閉線程池

System.out.println("sumValue = " + sumValue);

executor.shutdown();

}

private static long sum(int start, int end) {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

}

上面已經分析了計算密集型并不是線程越多越好,這里創建了JDK默認的線程數:CPU數+1,這是一個經過大量測試以后給出的一個結果;線程池顧名思義,可以重復利用現有的線程;同時利用CompletionService來對子任務進行匯總;合理的使用線程池已經可以充分的并行處理任務,只是在寫法上有點繁瑣,此時JDK1.7中引入了fork/join框架;

fork/join框架

分支/合并框架的目的是以遞歸的方式將可以并行的認為拆分成更小的任務,然后將每個子任務的結果合并起來生成整體結果;相關代碼如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask {

private static final long serialVersionUID = 1L;

private final long[] numbers;

private final int start;

private final int end;

public static final long THRESHOLD = 10_000;

public ForkJoinTest(long[] numbers) {

this(numbers, 0, numbers.length);

}

private ForkJoinTest(long[] numbers, int start, int end) {

this.numbers = numbers;

this.start = start;

this.end = end;

}

@Override

protected Long compute() {

int length = end - start;

if (length <= THRESHOLD) {

return computeSequentially();

}

ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);

leftTask.fork();

ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);

Long rightResult = rightTask.compute();

// 注:join方法會阻塞,因此有必要在兩個子任務的計算都開始之后才執行join方法

Long leftResult = leftTask.join();

return leftResult + rightResult;

}

private long computeSequentially() {

long sum = 0;

for (int i = start; i < end; i++) {

sum += numbers[i];

}

return sum;

}

public static void main(String[] args) {

System.out.println(forkJoinSum(10_000_000));

}

public static long forkJoinSum(long n) {

long[] numbers = LongStream.rangeClosed(1, n).toArray();

ForkJoinTask task = new ForkJoinTest(numbers);

return new ForkJoinPool().invoke(task);

}

}

ForkJoinPool是ExecutorService接口的一個實現,子認為分配給線程池中的工作線程;同時需要把任務提交到此線程池中,需要創建RecursiveTask的一個子類;大體邏輯就是通過fork進行拆分,然后通過join進行結果的合并,JDK為我們提供了一個框架,我們只需要在里面填充即可,更加方便;有沒有更簡單的方式,連拆分都省了,自動拆分合并,jdk在1.8中引入了流的概念;

流方式

Java8引入了stream的概念,可以讓我們更好的利用并行,使用流代碼如下:

public class StreamTest {

public static void main(String[] args) {

System.out.println("sum = " + parallelRangedSum(10_000_000));

}

public static long parallelRangedSum(long n) {

return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);

}

}

以上代碼是不是非常簡單,對于開發者來說完全不需要手動拆分,使用同步機制等方式,就可以讓任務并行處理,只需要對流使用parallel()方法,系統自動會對任務進行拆分,當然前提是沒有共享可變狀態;其實并行流內部使用的也是fork/join框架;

總結

本文使用一個求和的實例,來介紹了jdk為開發者提供并行處理的各種方式,可以看到Java一直在為提供更方便的并行處理而努力。

參考

<>

總結

以上是生活随笔為你收集整理的java多核并行计算_谈谈Java任务的并行处理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。