日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

并发队列、线程池、锁

發布時間:2025/3/15 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发队列、线程池、锁 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、CountDownLatch(計數器)
? ? ?CountDownLatch 類位于java.util.concurrent包下,利用它可以實現類似計數器的功能。比如有一個任務A,它要等待其他任務執行完畢之后才能執行,此時就可以利用CountDownLatch來實現這種功能了。CountDownLatch是通過一個計數器來實現的,計數器的初始值為線程的數量。每當一個線程完成了自己的任務后,計數器的值就會減1。當計數器值到達0時,它表示所有的線程已經完成了任務,然后在await()鎖上等待的線程就可以恢復執行任務。

package com.zhang;import java.util.concurrent.CountDownLatch;public class TestCountDownLatch {public static void main(String[] args) throws InterruptedException {final CountDownLatch countDownLatch = new CountDownLatch(2);new Thread(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ",子線程開始執行...");countDownLatch.countDown();System.out.println(Thread.currentThread().getName() + ",子線程結束執行...");}}).start();new Thread(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ",子線程開始執行...");countDownLatch.countDown();//計數器值每次減去1System.out.println(Thread.currentThread().getName() + ",子線程結束執行...");}}).start();countDownLatch.await();// 減去為0,恢復任務繼續執行System.out.println("兩個子線程執行完畢....");System.out.println("主線程繼續執行.....");for (int i = 0; i < 10; i++) {System.out.println("main,i:" + i);}}}

2、CyclicBarrier(屏障)

? ? ? CyclicBarrier初始化時規定一個數目,然后計算調用了CyclicBarrier.await()進入等待的線程數。當線程數達到了這個數目時,所有進入等待狀態的線程被喚醒并繼續CyclicBarrier就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。

? ? ? CyclicBarrier初始時還可帶一個Runnable的參數,此Runnable任務在CyclicBarrier的數目達到后,所有其它線程被喚醒前被執行。

import java.util.concurrent.CyclicBarrier;class Writer extends Thread {private CyclicBarrier cyclicBarrier;public Writer(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {try {System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數據");Thread.sleep(3000);System.out.println("線程" + Thread.currentThread().getName() + ",寫入數據成功.....");cyclicBarrier.await();System.out.println("所有線程執行完畢..........");} catch (Exception e) {}} }public class Test001 {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {public void run() {System.out.println("全部線程喚醒前被執行");}});for (int i = 0; i < 5; i++) {Writer writer = new Writer(cyclicBarrier);writer.start();}} }

3、Semaphore(計數信號量)

? ? ?Semaphore是一種基于計數的信號量。它可以設定一個閾值,多個線程競爭獲取許可信號,做自己的申請后歸還,超過閾值后線程申請許可信號將會被阻塞。

需求: 一個廁所只有3個坑位,但是有10個人來上廁所,那怎么辦?假設10的人的編號分別為1-10,并且1號先到廁所,10號最后到廁所。那么1-3號來的時候必然有可用坑位,順利如廁,4號來的時候需要看看前面3人是否有人出來了,如果有人出來就進去,否則等待。同樣的道理,4-10號也需要等待正在上廁所的人出來后才能進去,并且誰先進去這得看等待的人是否有素質,是否能遵守先來先上的規則。(公平鎖與非公平鎖:排隊和競爭)

package com.zhang.test;import java.util.concurrent.Semaphore;class ThradDemo001 extends Thread {private String name;private Semaphore wc;public ThradDemo001(String name, Semaphore wc) {this.name = name;this.wc = wc;}@Overridepublic void run() {try {// 剩下的資源int availablePermits = wc.availablePermits();if (availablePermits > 0) {System.out.println(name + "天助我也,終于有茅坑了.....");} else {System.out.println(name + "怎么沒有茅坑了...");}// 申請資源 wc.acquire();System.out.println(name + "終于上廁所啦.爽啊" + ",剩下廁所:" + wc.availablePermits());Thread.sleep(1000);System.out.println(name + "廁所上完啦!");// 釋放資源 wc.release();} catch (Exception e) {}} }public class TestSemaphore {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);for (int i = 1; i <= 10; i++) {ThradDemo001 thradDemo001 = new ThradDemo001("第" + i + "個人", semaphore);thradDemo001.start();}}}

? ? ? ? ?在很多情況下,可能有多個線程需要訪問數目很少的資源。假想在服務器上運行著若干個回答客戶端請求的線程。這些線程需要連接到同一數據庫,但任一時刻只能獲得一定數目的數據庫連接。你要怎樣才能夠有效地將這些固定數目的數據庫連接分配給大量的線程??? ??

? ? ? ?答:1.給方法加同步鎖,保證同一時刻只能有一個人去調用此方法,其他所有線程排隊等待,但是此種情況下即使你的數據庫鏈接有10個,也始終只有一個處于使用狀態。這樣將會大大的浪費系統資源,而且系統的運行效率非常的低下。

? ? ? ? ? ? 2.另外一種方法當然是使用信號量,通過信號量許可與數據庫可用連接數相同的數目,將大大的提高效率和性能。

3、并發隊列

? ? ?在并發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue為代表的高性能非阻塞隊列,一個是以BlockingQueue接口為代表的阻塞隊列,無論哪種都繼承自Queue。

4、阻塞隊列與非阻塞隊

? ? ? 阻塞隊列與普通隊列的區別在于,當隊列是空的時,從隊列中獲取元素的操作將會被阻塞,或者當隊列是滿時,往隊列里添加元素的操作會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程往空的隊列插入新的元素。試圖往已滿的阻塞隊列中添加新元素的線程同樣也會被阻塞,直到其他的線程使隊列重新變得不滿

1.ArrayDeque, (數組雙端隊列)?
2.PriorityQueue, (優先級隊列)?
3.ConcurrentLinkedQueue, (基于鏈表的并發隊列)?
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口)?
5.ArrayBlockingQueue, (基于數組的并發阻塞隊列)?
6.LinkedBlockingQueue, (基于鏈表的FIFO阻塞隊列)?
7.LinkedBlockingDeque, (基于鏈表的FIFO雙端阻塞隊列)?
8.PriorityBlockingQueue, (帶優先級的無界阻塞隊列)?
9.SynchronousQueue (并發同步阻塞隊列)

5、ConcurrentLinkedQueue(非阻塞)

? ? ?ConcurrentLinkedQueue?:?是一個適用于高并發場景下的隊列,通過無鎖的方式,實現了高并發狀態下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它是一個基于鏈接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。該隊列不允許null元素。

? ? ?ConcurrentLinkedQueue重要方法:
? ? ? ?add?和offer()?都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)
? ? ? ?poll()?和peek()?都是取頭元素節點,區別在于前者會刪除元素,后者不會。

package com.zhang.test;import java.util.concurrent.ConcurrentLinkedQueue;public class TestConcurrentLinkedQueue {public static void main(String[] args) {ConcurrentLinkedQueue q = new ConcurrentLinkedQueue();q.offer("張三");q.add("add");q.offer("李四");q.offer("哈哈哈");System.out.println(q.poll());//從頭獲取元素,刪除該元素System.out.println(q.peek()); //從頭獲取元素,不刪除該元素System.out.println(q.size()); //獲取總長度 } }

6、BlockingQueue

? ? ?阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。因此當一個線程試圖對一個已經滿了的隊列進行入隊列操作時,它將會被阻塞,除非有另一個線程做了出隊列操作;同樣,當一個線程試圖對一個空隊列進行出隊列操作時,它將會被阻塞,除非有另一個線程進行了入隊列操作。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。阻塞隊列是線程安全的。

7、ArrayBlockingQueue

? ? ?ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,我們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。

package com.zhang.test;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;public class TestArrayBlockingQueue {public static void main(String[] args) throws Exception{ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);arrays.add("李四");arrays.add("張軍");arrays.add("張軍");// 添加阻塞隊列arrays.offer("張三", 1, TimeUnit.SECONDS);System.out.println(arrays.poll());System.out.println(arrays.poll());System.out.println(arrays.poll());System.out.println(arrays.poll());//null } }

8、LinkedBlockingQueue

? ? ?LinkedBlockingQueue阻塞隊列大小的配置是可選的,如果我們初始化時指定一個大小,它就是有邊界的,如果不指定,它就是無邊界的。說是無邊界,其實是采用了默認大小為Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。

package com.zhang.test;import java.util.concurrent.LinkedBlockingQueue;public class TestLinkedBlockingQueue {public static void main(String[] args) {LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);linkedBlockingQueue.add("張三");linkedBlockingQueue.add("李四");linkedBlockingQueue.add("李四");// linkedBlockingQueue.add("王五"); //拋異常 System.out.println(linkedBlockingQueue.size());} }

9、使用BlockingQueue模擬生產者與消費者

package com.zhang.test;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;class ProducerThread implements Runnable {private BlockingQueue<String> blockingQueue;private AtomicInteger count = new AtomicInteger();private volatile boolean FLAG = true;public ProducerThread(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;}public void run() {System.out.println(Thread.currentThread().getName() + "生產者開始啟動....");while (FLAG) {String data = count.incrementAndGet() + "";try {boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS);if (offer) {System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功..");} else {System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗..");}Thread.sleep(1000);} catch (Exception e) {}}System.out.println(Thread.currentThread().getName() + ",生產者線程停止...");}public void stop() {this.FLAG = false;}}class ConsumerThread implements Runnable {private volatile boolean FLAG = true;private BlockingQueue<String> blockingQueue;public ConsumerThread(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;}public void run() {System.out.println(Thread.currentThread().getName() + "消費者開始啟動....");while (FLAG) {try {String data = blockingQueue.poll(2, TimeUnit.SECONDS);if (data == null) {FLAG = false;System.out.println("消費者超過2秒時間未獲取到消息.");return;}System.out.println("消費者獲取到隊列信息成功,data:" + data);} catch (Exception e) {// TODO: handle exception }}}}public class Test0008 {public static void main(String[] args) {LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>(3);ProducerThread producerThread = new ProducerThread(blockingQueue);ConsumerThread consumerThread = new ConsumerThread(blockingQueue);Thread t1 = new Thread(producerThread);Thread t2 = new Thread(consumerThread);t1.start();t2.start();//10秒后 停止線程..try {Thread.sleep(10*1000);producerThread.stop();} catch (Exception e) {// TODO: handle exception }}}

10、線程池的作用

合理地使用線程池能夠帶來3個好處:

  • 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
  • 提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控
  • 11、ThreadPoolExecutor機制?

    corePoolSize

    核心線程池大小

    maximumPoolSize

    最大線程池大小

    keepAliveTime

    線程池中超過corePoolSize數目的空閑線程最大存活時間;可以allowCoreThreadTimeOut(true)使得核心線程有效時間

    TimeUnit

    keepAliveTime時間單位

    workQueue

    阻塞任務隊列

    threadFactory

    新建線程工廠

    RejectedExecutionHandler

    當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

  • 當線程池小于corePoolSize時,新提交任務將創建一個新線程執行任務,即使此時線程池中存在空閑線程。?
  • 當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行?
  • 當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會創建新線程執行任務?
  • 當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理?
  • 當線程池中超過corePoolSize線程,空閑時間達到keepAliveTime時,關閉空閑線程?
  • 當設置allowCoreThreadTimeOut(true)時,線程池中corePoolSize線程空閑時間達到keepAliveTime也將關閉?
  • 12、線程池原理剖析

    ? ? ? ?提交一個任務到線程池中,線程池的處理流程如下:

  • 判斷線程池里的核心線程是否都在執行任務,如果不是(核心線程空閑或者還有核心線程沒有被創建)則創建一個新的工作線程來執行任務。如果核心線程都在執行任務,則進入下個流程。
  • 線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。
  • 判斷線程池里的線程是否都處于工作狀態,如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
  • 13、線程池四種創建方式

    ? ? ? ?Java通過Executors(jdk1.5并發包)提供四種線程池:
    newCachedThreadPool:創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

    // 無限大小線程池 jvm自動回收ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {final int temp = i;newCachedThreadPool.execute(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ",i:" + temp);}});}

    newFixedThreadPool 創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。

    ExecutorService newFixedThreadPool= Executors.newFixedThreadPool(10);for (int i=0;i<100;i++) {final int temp = i;newFixedThreadPool.execute(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName()+",i"+temp);}});}

    ?

    newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。

    ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);for (int i = 0; i < 10; i++) {final int temp = i;newScheduledThreadPool.schedule(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName()+",i:" + temp);}}, 3, TimeUnit.SECONDS);//表示延遲3秒執行}

    ?

    newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

    //結果依次輸出,相當于順序執行各個任務。 ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {final int index = i;newSingleThreadExecutor.execute(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName()+",index:" + index);try {Thread.sleep(200);} catch (Exception e) {}}});}

    14、自定義線程線程池

    package com.zhang.test;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;/*** //提交一個任務到線程池中,線程池的處理流程如下:* //1、判斷線程池里的核心線程是否都在執行任務,如果不是(核心線程空閑或者還有核心線程沒有被創建)則創建一個新的工作線程來執行任務。如果核心線程都在執行任務,則進入下個流程。* //2、線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列里。如果工作隊列滿了,則進入下個流程。* //3、判斷線程池里的線程是否都處于工作狀態,如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。*/ public class Test0007 {public static void main(String[] args) {/*** int corePoolSize 核心線程數* int maximumPoolSize 最大能創建多少個線程* long keepAliveTime 存活時間* TimeUnit unit,* BlockingQueue<Runnable> workQueue 線程隊列*/ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3));for (int i = 1; i <= 5; i++) {final int temp=i;threadPoolExecutor.execute(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName()+"任務"+temp);}});}threadPoolExecutor.shutdown();} }

    15、合理配置線程池

    ? ? ? ?CPU密集:是該任務需要大量的運算,而沒有阻塞,CPU一直全速運行。CPU密集型時,任務可以少配置線程數,大概和機器的cpu核數相當,這樣可以使得每個線程都在執行任務

    ? ? ? ?IO密集:即該任務需要大量的IO,即大量的阻塞。在單線程上運行IO密集型的任務會導致浪費大量的CPU運算能力浪費在等待。IO密集型時,大部分線程都阻塞,故需要多配置線程數,2*cpu核數

    16、Callable與Future

    ? ? ? ? 在Java中創建線程一般有兩種方式,一種是繼承Thread類,一種是實現Runnable接口。這兩種方式的缺點是在線程任務執行結束后,無法獲取執行結果。我們一般只能采用共享變量或共享存儲區以及線程通信的方式實現獲得任務結果的目的。Java中也提供了使用Callable和Future來實現獲取任務結果的操作。Callable用來執行任務產生結果,而Future用來獲得結果。Future常用方法:

  • V get() :獲取異步執行的結果,如果沒有結果可用,此方法會阻塞直到異步計算完成。
  • V get(Long timeout , TimeUnit unit) :獲取異步執行結果,如果沒有結果可用,此方法會阻塞,但是會有時間限制,如果阻塞時間超過設定的timeout時間,該方法將拋出異常。
  • boolean isDone() :如果任務執行結束,無論是正常結束或是中途取消還是發生異常,都返回true。
  • boolean isCanceller() :如果任務完成前被取消,則返回true。
  • boolean cancel(boolean mayInterruptRunning) :如果任務還沒開始,執行cancel(...)方法將返回false;如果任務已經啟動,執行cancel(true)方法將以中斷執行此任務線程的方式來試圖停止任務,如果停止成功,返回true;當任務已經啟動,執行cancel(false)方法將不會對正在執行的任務線程產生影響(讓線程正常執行到完成),此時返回false;當任務已經完成,執行cancel(...)方法將返回false。mayInterruptRunning參數表示是否中斷執行中的線程。
  • ? ? ? ?通過方法分析我們也知道實際上Future提供了3種功能:

  • 能夠中斷執行中的任務
  • 判斷任務是否執行完成
  • 獲取任務執行完成后的結果
  • package com.zhang.test;import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;public class TestMain {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newCachedThreadPool();Future<Integer> future = executor.submit(new Callable<Integer>() {public Integer call() throws Exception {System.out.println("子線程執行call()");Thread.sleep(5000);return 200;}});System.out.println("主線程執行其他任務");Integer integer = future.get();//等待返回結果,這里會阻塞 System.out.println(integer);// 關閉線程池if (executor != null)executor.shutdown();} }

    17、Futrure模式

     ? ?在多線程中經常舉的一個例子就是:網絡圖片的下載,剛開始是通過模糊的圖片來代替最后的圖片,等下載圖片的線程下載完圖片后在替換。而在這個過程中可以做一些其他的事情。首先客戶端向服務器請求RealSubject,但是這個資源的創建是非常耗時的。這種情況下,首先返回Client一個FutureSubject,以滿足客戶端的需求,于此同時,Future會通過另外一個Thread 去構造一個真正的資源,資源準備完畢之后,在給future一個通知。如果客戶端急于獲取這個真正的資源,那么就會阻塞客戶端的其他所有線程,等待資源準備完畢

    package com.zhang.future;/*** 公共數據接口,FutureData和RealData都要實現*/ public interface Data {String getRequest(); } package com.zhang.future;public class RealData implements Data {private String result;public RealData(String data) {System.out.println("正在使用data:" + data + "網絡請求數據,耗時操作需要等待.");try {Thread.sleep(3000);} catch (Exception e) {}System.out.println("操作完畢,獲取結果...");result = "哈哈哈";}public String getRequest() {return result;} } package com.zhang.future;/*** FutureData,當有線程想要獲取RealData的時候,程序會被阻塞。等到RealData被注入才會使用getReal()方法*/ public class FurureData implements Data {private volatile static boolean ISFLAG = false;private RealData realData;public synchronized void setRealData(RealData realData) {// 如果已經獲取到結果,直接返回if (ISFLAG) {return;}// 如果沒有獲取到數據,傳遞真是對象this.realData = realData;ISFLAG = true;// 進行通知 notify();}public synchronized String getRequest() {while (!ISFLAG) {try {wait();} catch (Exception e) {}}// 獲取到數據,直接返回return realData.getRequest();}} package com.zhang.future;public class FutureClient {public Data request( final String queryStr) {final FurureData furureData = new FurureData();new Thread(new Runnable() {public void run() {RealData realData = new RealData(queryStr);furureData.setRealData(realData);}}).start();return furureData;}} package com.zhang.future;public class Main {public static void main(String[] args) {FutureClient futureClient = new FutureClient();Data request = futureClient.request("請求參數.");System.out.println("請求發送成功!");System.out.println("執行其他任務...");String result = request.getRequest();System.out.println("獲取到結果..." + result);}}

    18、悲觀鎖與樂觀鎖

    ? ? ? ??悲觀鎖:悲觀鎖悲觀的認為每一次操作都會造成更新丟失問題,在每次查詢時加上排他鎖。每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會block直到它拿到鎖。傳統的關系型數據庫里邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖。synchronized的思想也是悲觀鎖。

    Select * from xxx for update;

    ? ? ? ?樂觀鎖:樂觀鎖會樂觀的認為每次查詢都不會造成更新丟失,利用版本字段控制。一般是在數據表中加上一個數據版本號version字段,表示數據被修改的次數,當數據被修改時,version值會加一。當線程A要更新數據值時,在讀取數據的同時也會讀取version值,在提交更新時,若剛才讀取到的version值為當前數據庫中的version值相等時才更新,否則重試更新操作,直到更新成功。

    update table set x=x+1, version=version+1 where id=#{id} and version=#{version};

    19、重入鎖

    ? ? ? ?重入鎖,也叫做遞歸鎖,指的是同一線程外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。ReentrantLock和synchronized 都是可重入鎖

    package com.zhang.cache;public class Test implements Runnable {private synchronized void get() {System.out.println("name:" + Thread.currentThread().getName() + " get();");set();}private synchronized void set() {System.out.println("name:" + Thread.currentThread().getName() + " set();");}public void run() {get();}public static void main(String[] args) {Test ss = new Test();new Thread(ss).start();new Thread(ss).start();new Thread(ss).start();new Thread(ss).start();} } package com.zhang.cache;import java.util.concurrent.locks.ReentrantLock;public class Test02 extends Thread {ReentrantLock lock = new ReentrantLock();private void get() {lock.lock();System.out.println(Thread.currentThread().getId()+",get()");set();lock.unlock();}private void set() {lock.lock();System.out.println(Thread.currentThread().getId()+",set()");lock.unlock();}public void run() {get();}public static void main(String[] args) {Test02 ss = new Test02();new Thread(ss).start();new Thread(ss).start();new Thread(ss).start();}}

    20、讀寫鎖

    假設你的程序中涉及到對一些共享資源的讀和寫操作,且寫操作沒有讀操作那么頻繁。在沒有寫操作的時候,兩個線程同時讀一個資源沒有任何問題,所以應該允許多個線程能在同時讀取共享資源。但是如果有一個線程想去寫這些共享資源,就不應該再有其它線程對該資源進行讀或寫。這就需要一個讀/寫鎖來解決這個問題。Java5在java.util.concurrent包中已經包含了讀寫鎖。

    package com.zhang.cache;import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock;public class Cache {static Map<String, Object> map = new HashMap<String, Object>();static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();static Lock r = rwl.readLock();static Lock w = rwl.writeLock();// 獲取一個key對應的valueprivate static Object get(String key) {r.lock();try {System.out.println("正在做讀的操作,key:" + key + " 開始");Thread.sleep(100);Object object = map.get(key);System.out.println("正在做讀的操作,key:" + key + ",value:" + object + "結束.");return object;} catch (InterruptedException e) {} finally {r.unlock();}return key;}// 設置key對應的valueprivate static Object put(String key, Object value) {w.lock();try {System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "開始.");Thread.sleep(100);Object object = map.put(key, value);System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "結束.");return object;} catch (InterruptedException e) {} finally {w.unlock();}return value;}public static void main(String[] args) {new Thread(new Runnable() {public void run() {for (int i = 0; i < 10; i++) {Cache.put(i + "", i + "");}}}).start();new Thread(new Runnable() {public void run() {for (int i = 0; i < 10; i++) {Cache.get(i + "");}}}).start();} }

    21、CAS無鎖機制

    無鎖的好處:

  • 由于其非阻塞性,它對死鎖問題天生免疫,并且線程間的相互影響也遠遠比基于鎖的方式要小。
  • 使用無鎖的方式完全沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,因此它要比基于鎖的方式擁有更優越的性能
  • CAS算法理解:

  • 它包含三個參數CAS(V,E,N): V表示要更新的變量,E表示預期值,N表示新值。僅當V值等于E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他線程做了更新,則當前線程什么都不做。最后,CAS返回當前V的真實值。
  • CAS操作是抱著樂觀的態度進行的,它總是認為自己可以成功完成操作。當多個線程同時使用CAS操作一個變量時,只有一個會勝出,并成功更新,其余均會失敗。失敗的線程不會被掛起,僅是被告知失敗,并且允許再次嘗試,當然也允許失敗的線程放棄操作。基于這樣的原理,CAS操作即使沒有鎖,也可以發現其他線程對當前線程的干擾,并進行恰當的處理。
  • 簡單地說,CAS需要你額外給出一個期望值,也就是你認為這個變量現在應該是什么樣子的。如果變量不是你想象的那樣,那說明它已經被別人修改過了。你就重新讀取,再次嘗試修改就好了。
  • CAS缺點:

  • CAS存在一個很明顯的問題,即ABA問題:如果變量V初次讀取的時候是A,并且在準備賦值的時候檢查到它仍然是A,如果在這段期間曾經被改成B,然后又改回A,那CAS操作就會誤認為它從來沒有被修改過。針對這種情況,java并發包中提供了一個帶有標記的原子引用類AtomicStampedReference,它可以通過控制變量值的版本來保證CAS的正確性
  • CPU開銷較大:在并發量比較高的情況下,如果許多線程反復嘗試更新某一個變量,卻又一直更新不成功,循環往復,會給CPU帶來很大的壓力
  • 不能保證代碼塊的原子性:CAS機制所保證的只是一個變量的原子性操作,而不能保證整個代碼塊的原子性。比如需要保證3個變量共同進行原子性的更新,就不得不使用Synchronized了。
  • 22、自旋鎖

    ? ? ? ?自旋鎖原理非常簡單,如果持有鎖的線程能在很短時間內釋放鎖資源,那么那些等待競爭鎖的線程就不需要做內核態和用戶態之間的切換進入阻塞掛起狀態,它們只需要等一等(自旋:死循環),等持有鎖的線程釋放鎖后即可立即獲取鎖,這樣就避免用戶線程和內核的切換的消耗。但是線程自旋是需要消耗cup的,說白了就是讓cup在做無用功,線程不能一直占用cup自旋做無用功,所以需要設定一個自旋等待的最大時間。如果持有鎖的線程執行的時間超過自旋等待的最大時間扔沒有釋放鎖,就會導致其它爭用鎖的線程在最大等待時間內還是獲取不到鎖,這時爭用線程會停止自旋進入阻塞狀態。

    23、常用原子類

    ? ? ? ??Java中的原子操作類大致可以分為4類:原子更新基本類型、原子更新數組類型、原子更新引用類型、原子更新屬性類型。這些原子類中都是用了無鎖的概念,有的地方直接使用CAS操作的線程安全的類型。

    AtomicBoolean

    AtomicInteger

    AtomicLong

    AtomicReference

    ?

    package com.zhang.future;import java.util.concurrent.atomic.AtomicInteger;public class Test0001 implements Runnable {private static AtomicInteger atomic = new AtomicInteger();public void run() {while (true) {int count = getCountAtomic();System.out.println(count);if (count >= 150) {break;}}}private Integer getCountAtomic() {try {Thread.sleep(50);} catch (Exception e) {}return atomic.incrementAndGet();}public static void main(String[] args) {Test0001 test0001 = new Test0001();Thread t1 = new Thread(test0001);Thread t2 = new Thread(test0001);t1.start();t2.start();}}

    ?

    24、并發框架Disruptor

    轉載于:https://www.cnblogs.com/zhangjinru123/p/10429472.html

    總結

    以上是生活随笔為你收集整理的并发队列、线程池、锁的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。