多线程JUC学习
補充之前的學習筆記
1JUC是什么
1.1 Java.util.concurrent =JUC
1.2?進程:系統里運行的多個程序QQ.exe ? ??線程:一個進程中有多個線程
1.3 線程的多種狀態。
.start()--就緒狀態
State:
new創建 ?runnable啟動 ?blocked阻塞 waiting等待(不見不散) timed_waiting等待(過時不候)?terminated終結
1.4 ?wait交鎖 ?sleep不交鎖
1.5 ?并發:同一時間點多個線程訪問同一個資源 ? ? ? ? ? 并行:同時執行多個資源
?
2、lambda表達式
①寫法:拷貝中括號+寫死右箭頭+落地大括號
②只有函數接口(接口里只有一個方法時)才能用lambda表達式
③接口上標記@FunctionalInterface
Foo foo = () -> {業務邏輯代碼,實現方法}
④default方法的實現
用@FunctionalInterface的接口只能有個一方法,但是可以又多個default方法
⑤靜態方法實現
用@FunctionalInterface的接口只能有個一方法,但是可以又多個default方法,可以有多個靜態方法
?
3、線程間的通信
3.1生產者+消費者
3.2通知等待喚醒機制
判斷 ? 干活 ? ?通知
3.3 2個線程變4個線程,禁止出現虛假喚醒,判斷條件用while
3.4 ?
----------------lock----------------- ??
Lock lock=new ? ReentrantLock();
Condition condition = lock.newCondition();
condition.awati(); ? ?codittion.signalAll(); ? ? ? ? ??
-------------------syncronised----------------- ? ?
? ? ? ? ? | ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
? ? ? wait(); ? ? ? ? ? ? ? ? ? ? ? notifiyAll();
?
4、傳值和傳引用
基本類型傳復印件
引用類型傳引用
String類型比較特殊,因為有個池的概念,所以相當于指向變了,但是原來的指針還是指向原來的引用
?
5、Callable
callable:有異常、有返回值、call
runnable:無異常、無返回值、run
FutureTask作用:異步調用
自頂向下,逐步求精
一個futuretask被多個線程調用,結果可以復用
futureTask.get()只允許放到最后,get方法只計算一次?
-----------------原理,底層------------------
①在主線程中需要執行比較耗時的操作時,但又不想阻塞主線程時,可以把這些作業交給Future對象在后臺完成,當主線程將來需要時,就可以通過future對象獲得后臺作業的計算結果或者執行狀態。
②一般FutureTask多用于耗時的計算,主線程可以在完成自己的任務后,再去獲取結果
③僅再計算完成時才能檢索結果,如果計算尚未完成,則阻塞get方法,一旦計算完成,就不能再重新開始或者取消計算,get方法獲取結果只有再計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態,然后會返回結果或者拋出異常。
?
6、ReadWriteLock
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock ();
rwLock .writeLock().lock();
rwLock .readLock().lock();
讀寫鎖案例:讀可共享,寫排他
?
| class ReadWrite{ ??????private Object obj; ??????private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); ?????? ??????public void writeLock(Object obj){ ???????????? rwLock.writeLock().lock(); ???????????? try { ?????????????????? this. obj = obj; ??????????????????System. out.println(Thread. currentThread().getName()+"寫線程正在執行\t"+obj); ????????????} catch (Exception e) { ?????????????????? // TODO: handle exception ????????????} finally{ ?????????????????? rwLock.writeLock().unlock(); ????????????} ??????} ?????? ??????public void readLock(){ ???????????? rwLock.readLock().lock(); ???????????? try { ??????????????????System. out.println(Thread. currentThread().getName()+"讀線程正在執行\t"+obj); ????????????} catch (Exception e) { ?????????????????? // TODO Auto-generated catch block ?????????????????? e.printStackTrace(); ????????????} finally{ ?????????????????? rwLock.readLock().unlock(); ????????????} ??????} ?????? } |
?
?
7、線程池
ExecutorService service=Executors.newFixedThreadPool(5);//一池5線程(核心線程=最大線程=5)
-
阻塞隊列采用了LinkedBlockingQueue,它是一個無界隊列;
-
由于阻塞隊列是一個無界隊列,因此永遠不可能拒絕任務;
-
由于采用了無界隊列,實際線程數量將永遠維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。
?
| ? ??public static ExecutorService newFixedThreadPool( int nThreads) { ??????? return new ThreadPoolExecutor( nThreads, nThreads , ????????????????????????????????????? 0L, TimeUnit. MILLISECONDS, ????????????????????????????????????? new LinkedBlockingQueue<Runnable>()); ??? } |
ExecutorService service=Executors.newSingleThreadExecutor();//一池1線程(核心線程=最大線程=1)
?
| ?public static ExecutorService newSingleThreadExecutor() { ??????? return new FinalizableDelegatedExecutorService ??????????? ( new ThreadPoolExecutor(1, 1, ??????????????????????????????????? 0L, TimeUnit. MILLISECONDS, ??????????????????????????????????? new LinkedBlockingQueue<Runnable>())); ??? } |
?
ExecutorService service=Executors.newCachedThreadPool();//一池N線程(核心線程0,最大線程int的最大值相當于沒有上限)
-
它是一個可以無限擴大的線程池;
-
它比較適合處理執行時間比較小的任務;
-
corePoolSize為0,maximumPoolSize為無限大,意味著線程數量可以無限大;
-
keepAliveTime為60S,意味著線程空閑時間超過60S就會被殺死;
-
采用SynchronousQueue裝等待的任務,這個阻塞隊列沒有存儲空間,這意味著只要有請求到來,就必須要找到一條工作線程處理他,如果當前沒有空閑的線程,那么就會再創建一條新的線程。
?
| public?static?ExecutorService?newCachedThreadPool?() { ??return?new?ThreadPoolExecutor(0, Integer.?MAX_VALUE, ????????????????????????????????????? 60L, TimeUnit.?SECONDS, ??????????????????????????????????????new?SynchronousQueue<Runnable>()); } |
?
service.submit(Runnable);
service.shutdown();
---------------------------------
ScheduledExecutorService service=Executors.newScheduledThreadPool(5);//時間輪詢,每隔多少時間執行一個任務,如果線程忙不過來會自動新加線程(核心線程5,最大線程int最大值相當于沒有上限)
-
它采用DelayQueue存儲等待的任務
-
DelayQueue內部封裝了一個PriorityQueue,它會根據time的先后時間排序,若time相同則根據sequenceNumber排序;
-
DelayQueue也是一個無界隊列;
-
工作線程的執行過程:
-
工作線程會從DelayQueue取已經到期的任務去執行;
-
執行結束后重新設置任務的到期時間,再次放回DelayQueue
?
| ? ???public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { ??????? return new ScheduledThreadPoolExecutor( corePoolSize); ? ? ?} ? ??public ScheduledThreadPoolExecutor( int corePoolSize) { ??????? super(corePoolSize, Integer. MAX_VALUE, 0, TimeUnit. NANOSECONDS, ????????????? new DelayedWorkQueue()); ? ? ?} ? ? ?public ThreadPoolExecutor( int corePoolSize, ????????????????????????????? int maximumPoolSize, ????????????????????????????? long keepAliveTime, ????????????????????????????? TimeUnit unit, ????????????????????????????? BlockingQueue<Runnable> workQueue) { ??????? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ???????????? Executors. defaultThreadFactory(), defaultHandler); ? ? ?} |
service.schedule(callable,delay,unit);//線程,延遲,時間單位 ? 每隔2s提交一次請求
?
| public class ReadWriteLock { ??????public static <E> void main(String[] args) { ???????????? ????????????ExecutorService service = Executors. newCachedThreadPool(); ???????????? ???????????? final ReadWrite readWrite = new ReadWrite(); ???????????? ???????????? try { ??????????????????Thread thread1 = new Thread( new Runnable() { ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? readWrite.writeLock( "this is write"); ?????????????????????????????? ????????????????????????} ??????????????????}, "AA"); ?????????????????? service.execute( thread1);????? ?????????????????? ?????????????????? for( int i=1; i<=100; i++){ ????????????????????????Thread thread2 = new Thread( new Runnable() { ?????????????????????????????? @Override ?????????????????????????????? public void run() { ???????????????????????????????????? readWrite.readLock(); ??????????????????????????????} ????????????????????????}, String. valueOf(i)); ???????????????????????? service.execute( thread2); ??????????????????} ????????????} catch (Exception e) { ?????????????????? // TODO Auto-generated catch block ?????????????????? e.printStackTrace(); ????????????} finally{ ?????????????????? service.shutdown(); ????????????} ??????} } |
?
?
8、常用工具類
CountDownLatch----所有線程執行完才執行的任務(秦滅六國,一統華夏)
final CountDownLatch countDownLatch = new CountDownLatch(6);
countDownLatch .countDown();//沒執行一條就-1,直到6條都執行完
countDownLatch .await();//阻塞最后一個要執行的主線程
?
| public class CountDownLatchDemo { ?????? ??????public static void main(String[] args) throws InterruptedException { ???????????? //CountDownLatch ???????????? final CountDownLatch countDownLatch = new CountDownLatch(6); ???????????? ???????????? for( int i=1; i<=6; i++){ ?????????????????? new Thread( new Runnable() { ???????????????????????? ???????????????????????? @Override ???????????????????????? public void run() { ??????????????????????????????System. out.println(Thread. currentThread().getName()+"\t 國家被滅"); ?????????????????????????????? countDownLatch.countDown(); ????????????????????????} ??????????????????}, CountryEnums.forEachCountryEnums(i).getMsg()).start(); ????????????} ???????????? ???????????? countDownLatch.await(); ????????????System. out.println(Thread. currentThread().getName()+"\t 秦滅六國,一統華夏" ); ??????} } |
?
?
CyclicBarrier---集齊7顆龍珠,可以召喚神龍(其他線程執行完了只能等待)
?
| public class CyclicBarrierDemo { ?????? ??????private final static int number=7; ?????? ??????public static void main(String[] args) { ???????????? final CyclicBarrier cyclicBarrier = new CyclicBarrier(number, new Runnable() { ?????????????????? @Override ?????????????????? public void run() { ????????????????????????System. out.println( "集齊7顆龍珠,可以召喚神龍" ); ??????????????????} ????????????}); ???????????? for( int i=1; i<=7; i++){ ?????????????????? final int temp= i; ?????????????????? new Thread( new Runnable() { ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? try { ????????????????????????????????????System. out.println(Thread. currentThread().getName()+"\t 收集第"+ temp+ "顆龍珠"); ???????????????????????????????????? cyclicBarrier.await(); ??????????????????????????????} catch (InterruptedException | BrokenBarrierException e) { ???????????????????????????????????? e.printStackTrace(); ??????????????????????????????} ????????????????????????} ??????????????????}, String. valueOf(i)).start(); ????????????} ???????????? ??????} } |
?
Semaphore----信號燈(爭車位)
?
| public class SemaphoreDemo { ??????public static void main(String[] args) { ???????????? final Semaphore semaphore = new Semaphore(3); //模擬3個停車位 ???????????? for( int i=1; i<=6; i++){ //模擬6個汽車 ?????????????????? new Thread( new Runnable() { ???????????????????????? ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? try { ???????????????????????????????????? semaphore.acquire(); ????????????????????????????????????System. out.println(Thread. currentThread().getName()+ "\t 搶占到停車位" ); ????????????????????????????????????TimeUnit. SECONDS.sleep( new Random().nextInt(5)); ????????????????????????????????????System. out.println(Thread. currentThread().getName()+ "\t---- 離開了停車位" ); ??????????????????????????????} catch (InterruptedException e) { ???????????????????????????????????? e.printStackTrace(); ??????????????????????????????} finally{ ???????????????????????????????????? semaphore.release(); ??????????????????????????????} ?????????????????????????????? ????????????????????????} ??????????????????}, String. valueOf(i)).start(); ????????????} ??????} } |
?
9、集合不安全類
ArrayList、HashMap、HashSet
java.util.ConcurrentModificationException
CopyOnWriteArrayList();//寫時復制
往元素中添加元素時,先復制一份新的數組(Arrays.copyOf),長度+1,把要添加的元素添加到新的數組中。最后把引用指向新的數組。(整個過程添加了reentrainLock)
CopyOnWriteArraySet<String>();
ConcurrentHashMap<>();
sss---->Arrays ?Collections
三者對比:
?
| 1)CountDownLatch和CyclicBarrier都能夠實現線程之間的等待,只不過它們側重點不同: CountDownLatch一般用于某個線程A等待若干個其他線程執行完任務之后,它才執行; 而CyclicBarrier一般用于一組線程互相等待至某個狀態,然后這一組線程再同時執行; 另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。 2)Semaphore其實和鎖有點類似,它一般用于控制對某組資源的訪問權限。 |
?
?
10、volatile
內存可見性?
?
| public class SafeSingletonDemo { ??????private static volatile SafeSingletonDemo safeSingletonDemo = null; ??????private SafeSingletonDemo(){ ????????????System. out.println( "********"+Thread. currentThread().getName()); ??????} ?????? ??????//double check lock ??????public static SafeSingletonDemo getInstance(){ ???????????? if( null == safeSingletonDemo){ ?????????????????? synchronized (SafeSingletonDemo. class) { ???????????????????????? if( null == safeSingletonDemo){ ?????????????????????????????? safeSingletonDemo= new SafeSingletonDemo(); ????????????????????????} ??????????????????} ????????????} ???????????? return safeSingletonDemo; ??????} ?????? ??????public static void main(String[] args) { ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "AA").start(); ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "BB").start(); ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "CC").start(); ??????} } |
彩蛋:某次公開課中記錄的筆記
擊穿緩存的方法:
口訣:讀多寫少用緩存,寫多讀少用隊列,限流、分流
方法1:化并發為同步 ??
lock:等待鎖:粗粒度的鎖
1個線程拿到鎖,重建緩存,
其他1999個線程等待,從redis取
方法2:互斥鎖ConcurrentHashMap<> map ? ? 細粒度的鎖
車次號1-->是否有鎖
車次號2-->是否有鎖
boolean lock = false;
lock = map.putIfAbsent(key,value)==null;//代表當前沒有數據,不為null,當前有數據
if(lock){//拿到鎖
? ? ?//重建緩存
? ? ?//再查一次
}else{//沒拿到鎖
? ? ?//緩存降級
? ? ?1:提示:當前人數太多,請耐心等待
}
?
總結
- 上一篇: python变量/分支/循环/数组/列表
- 下一篇: 分布式架构演进过程