AQS(CountdownLatch、CyclicBarrier、Semaphore)、FutureTask、BlockingQueue、ForkJoin
1. J.U.C - AQS (AbstractQueuedSynchronizer)
java.util.concurrent(J.U.C) 大大提高了并發性能,AQS 被認為是 J.U.C 的核心。
1.1?CountdownLatch
用來控制一個線程等待多個線程。
維護了一個計數器 cnt,每次調用 countDown() 方法會讓計數器的值減 1,減到 0的時候,那些因為調用 await() 方法而在等待的線程就會被喚醒。
public class CountdownLatchExample {public static void main(String[] args) throws InterruptedException {final int totalThread = 10;CountDownLatch countDownLatch = new CountDownLatch(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("run..");countDownLatch.countDown();});} countDownLatch.await();System.out.println("end");executorService.shutdown();} }運行結果:
1.2 CyclicBarrier
用來控制多個線程互相等待,只有當多個線程都到達時,這些線程才會繼續執行。
和 CountdownLatch 相似,都是通過維護計數器來實現的。
- 線程執行 await() 方法之后計數器會減 1,并進行等待,直到計數器為 0,所有調用 awati() 方法而在等待的線程才能繼續執行。
CyclicBarrier 和 CountdownLatch 的一個區別是:
- CyclicBarrier 的計數器通過調用reset() 方法可以循環使用,所以它才叫做循環屏障。
CyclicBarrier 有兩個構造函數,其中 parties 指示計數器的初始值,barrierAction在所有線程都到達屏障的時候會執行一次。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0)throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) {this(parties, null); } public class CyclicBarrierExample {public static void main(String[] args) {final int totalThread = 10;CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("before..");try {cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();} System.out.print("after..");});} executorService.shutdown();} }運行結果:
1.3?Semaphore
Semaphore 就是操作系統中的信號量,可以控制對互斥資源的訪問線程數。
以下代碼模擬了對某個服務的并發請求,每次只能有 3 個客戶端同時訪問,請求總數為 10。
public class SemaphoreExample {public static void main(String[] args) {final int clientCount = 3;final int totalRequestCount = 10;Semaphore semaphore = new Semaphore(clientCount);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalRequestCount; i++) {executorService.execute(()->{try {semaphore.acquire();System.out.print(semaphore.availablePermits() + " ");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}});} executorService.shutdown();} }運行結果:
2. J.U.C - 其它組件
2.1?FutureTask
在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。
FutureTask 實現了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>public interface RunnableFuture<V> extends Runnable, Future<V>FutureTask 可用于異步獲取執行結果或取消執行任務的場景。
當一個計算任務需要執行很長時間,那么就可以用 FutureTask 來封裝這個任務,主線程在完成自己的任務之后再去獲取結果。
public class FutureTaskExample {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for (int i = 0; i < 100; i++) {Thread.sleep(10);result += i;} return result;}});Thread computeThread = new Thread(futureTask);computeThread.start();Thread otherThread = new Thread(() -> {System.out.println("other task is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});otherThread.start();System.out.println(futureTask.get());} }運行結果:
2.2?BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊列的實現:
- FIFO 隊列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
- 優先級隊列 :PriorityBlockingQueue提供了阻塞的 take() 和 put() 方法:
- 如果隊列為空 take() 將阻塞,直到隊列中有內容;
- 如果隊列為滿 put() 將阻塞,直到隊列有空閑位置。
使用 BlockingQueue 實現生產者消費者問題:
public class ProducerConsumer {private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);private static class Producer extends Thread {@Overridepublic void run() {try {queue.put("product");} catch (InterruptedException e) {e.printStackTrace();} System.out.print("produce..");}}private static class Consumer extends Thread {@Overridepublic void run() {try {String product = queue.take();} catch (InterruptedException e) {e.printStackTrace();} System.out.print("consume..");}} }public static void main(String[] args) {for (int i = 0; i < 2; i++) {Producer producer = new Producer();producer.start();} for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer();consumer.start();} for (int i = 0; i < 3; i++) {Producer producer = new Producer();producer.start();} }運行結果:
2.3?ForkJoin
主要用于并行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務并行計算。
public class ForkJoinExample extends RecursiveTask<Integer> {private final int threshold = 5;private int first;private int last;public ForkJoinExample(int first, int last) {this.first = first;this.last = last;}@Overrideprotected Integer compute() {int result = 0;if (last - first <= threshold) {// 任務足夠小則直接計算for (int i = first; i <= last; i++) {result += i;}} else {// 拆分成小任務int middle = first + (last - first) / 2;ForkJoinExample leftTask = new ForkJoinExample(first, middle);ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);leftTask.fork();rightTask.fork();result = leftTask.join() + rightTask.join();}return result;} }public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinExample example = new ForkJoinExample(1, 10000);ForkJoinPool forkJoinPool = new ForkJoinPool();Future result = forkJoinPool.submit(example);System.out.println(result.get()); }ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的線程池,線程數量取決于CPU 核數。
public class ForkJoinPool extends AbstractExecutorServiceForkJoinPool 實現了工作竊取算法來提高 CPU 的利用率。
- 每個線程都維護了一個雙端隊列,用來存儲需要執行的任務。
- 工作竊取算法允許空閑的線程從其它線程的雙端隊列中竊取一個任務來執行。
- 竊取的任務必須是最晚的任務,避免和隊列所屬線程發生競爭。但是如果隊列中只有一個任務時還是會發生競爭。
?
?
總結
以上是生活随笔為你收集整理的AQS(CountdownLatch、CyclicBarrier、Semaphore)、FutureTask、BlockingQueue、ForkJoin的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互斥同步(synchronized、Lo
- 下一篇: 线程安全的实现方法