多线程JUC学习
補(bǔ)充之前的學(xué)習(xí)筆記
1JUC是什么
1.1 Java.util.concurrent =JUC
1.2?進(jìn)程:系統(tǒng)里運(yùn)行的多個(gè)程序QQ.exe ? ??線(xiàn)程:一個(gè)進(jìn)程中有多個(gè)線(xiàn)程
1.3 線(xiàn)程的多種狀態(tài)。
.start()--就緒狀態(tài)
State:
new創(chuàng)建 ?runnable啟動(dòng) ?blocked阻塞 waiting等待(不見(jiàn)不散) timed_waiting等待(過(guò)時(shí)不候)?terminated終結(jié)
1.4 ?wait交鎖 ?sleep不交鎖
1.5 ?并發(fā):同一時(shí)間點(diǎn)多個(gè)線(xiàn)程訪問(wèn)同一個(gè)資源 ? ? ? ? ? 并行:同時(shí)執(zhí)行多個(gè)資源
?
2、lambda表達(dá)式
①寫(xiě)法:拷貝中括號(hào)+寫(xiě)死右箭頭+落地大括號(hào)
②只有函數(shù)接口(接口里只有一個(gè)方法時(shí))才能用lambda表達(dá)式
③接口上標(biāo)記@FunctionalInterface
Foo foo = () -> {業(yè)務(wù)邏輯代碼,實(shí)現(xiàn)方法}
④default方法的實(shí)現(xiàn)
用@FunctionalInterface的接口只能有個(gè)一方法,但是可以又多個(gè)default方法
⑤靜態(tài)方法實(shí)現(xiàn)
用@FunctionalInterface的接口只能有個(gè)一方法,但是可以又多個(gè)default方法,可以有多個(gè)靜態(tài)方法
?
3、線(xiàn)程間的通信
3.1生產(chǎn)者+消費(fèi)者
3.2通知等待喚醒機(jī)制
判斷 ? 干活 ? ?通知
3.3 2個(gè)線(xiàn)程變4個(gè)線(xiàn)程,禁止出現(xiàn)虛假喚醒,判斷條件用while
3.4 ?
----------------lock----------------- ??
Lock lock=new ? ReentrantLock();
Condition condition = lock.newCondition();
condition.awati(); ? ?codittion.signalAll(); ? ? ? ? ??
-------------------syncronised----------------- ? ?
? ? ? ? ? | ? ? ? ? ? ? ? ? ? ? ? ? ? ? |
? ? ? wait(); ? ? ? ? ? ? ? ? ? ? ? notifiyAll();
?
4、傳值和傳引用
基本類(lèi)型傳復(fù)印件
引用類(lèi)型傳引用
String類(lèi)型比較特殊,因?yàn)橛袀€(gè)池的概念,所以相當(dāng)于指向變了,但是原來(lái)的指針還是指向原來(lái)的引用
?
5、Callable
callable:有異常、有返回值、call
runnable:無(wú)異常、無(wú)返回值、run
FutureTask作用:異步調(diào)用
自頂向下,逐步求精
一個(gè)futuretask被多個(gè)線(xiàn)程調(diào)用,結(jié)果可以復(fù)用
futureTask.get()只允許放到最后,get方法只計(jì)算一次?
-----------------原理,底層------------------
①在主線(xiàn)程中需要執(zhí)行比較耗時(shí)的操作時(shí),但又不想阻塞主線(xiàn)程時(shí),可以把這些作業(yè)交給Future對(duì)象在后臺(tái)完成,當(dāng)主線(xiàn)程將來(lái)需要時(shí),就可以通過(guò)future對(duì)象獲得后臺(tái)作業(yè)的計(jì)算結(jié)果或者執(zhí)行狀態(tài)。
②一般FutureTask多用于耗時(shí)的計(jì)算,主線(xiàn)程可以在完成自己的任務(wù)后,再去獲取結(jié)果
③僅再計(jì)算完成時(shí)才能檢索結(jié)果,如果計(jì)算尚未完成,則阻塞get方法,一旦計(jì)算完成,就不能再重新開(kāi)始或者取消計(jì)算,get方法獲取結(jié)果只有再計(jì)算完成時(shí)獲取,否則會(huì)一直阻塞直到任務(wù)轉(zhuǎn)入完成狀態(tài),然后會(huì)返回結(jié)果或者拋出異常。
?
6、ReadWriteLock
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock ();
rwLock .writeLock().lock();
rwLock .readLock().lock();
讀寫(xiě)鎖案例:讀可共享,寫(xiě)排他
?
| class ReadWrite{ ??????private Object obj; ??????private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); ?????? ??????public void writeLock(Object obj){ ???????????? rwLock.writeLock().lock(); ???????????? try { ?????????????????? this. obj = obj; ??????????????????System. out.println(Thread. currentThread().getName()+"寫(xiě)線(xiàn)程正在執(zhí)行\(zhòng)t"+obj); ????????????} catch (Exception e) { ?????????????????? // TODO: handle exception ????????????} finally{ ?????????????????? rwLock.writeLock().unlock(); ????????????} ??????} ?????? ??????public void readLock(){ ???????????? rwLock.readLock().lock(); ???????????? try { ??????????????????System. out.println(Thread. currentThread().getName()+"讀線(xiàn)程正在執(zhí)行\(zhòng)t"+obj); ????????????} catch (Exception e) { ?????????????????? // TODO Auto-generated catch block ?????????????????? e.printStackTrace(); ????????????} finally{ ?????????????????? rwLock.readLock().unlock(); ????????????} ??????} ?????? } |
?
?
7、線(xiàn)程池
ExecutorService service=Executors.newFixedThreadPool(5);//一池5線(xiàn)程(核心線(xiàn)程=最大線(xiàn)程=5)
-
阻塞隊(duì)列采用了LinkedBlockingQueue,它是一個(gè)無(wú)界隊(duì)列;
-
由于阻塞隊(duì)列是一個(gè)無(wú)界隊(duì)列,因此永遠(yuǎn)不可能拒絕任務(wù);
-
由于采用了無(wú)界隊(duì)列,實(shí)際線(xiàn)程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無(wú)效。
?
| ? ??public static ExecutorService newFixedThreadPool( int nThreads) { ??????? return new ThreadPoolExecutor( nThreads, nThreads , ????????????????????????????????????? 0L, TimeUnit. MILLISECONDS, ????????????????????????????????????? new LinkedBlockingQueue<Runnable>()); ??? } |
ExecutorService service=Executors.newSingleThreadExecutor();//一池1線(xiàn)程(核心線(xiàn)程=最大線(xiàn)程=1)
?
| ?public static ExecutorService newSingleThreadExecutor() { ??????? return new FinalizableDelegatedExecutorService ??????????? ( new ThreadPoolExecutor(1, 1, ??????????????????????????????????? 0L, TimeUnit. MILLISECONDS, ??????????????????????????????????? new LinkedBlockingQueue<Runnable>())); ??? } |
?
ExecutorService service=Executors.newCachedThreadPool();//一池N線(xiàn)程(核心線(xiàn)程0,最大線(xiàn)程int的最大值相當(dāng)于沒(méi)有上限)
-
它是一個(gè)可以無(wú)限擴(kuò)大的線(xiàn)程池;
-
它比較適合處理執(zhí)行時(shí)間比較小的任務(wù);
-
corePoolSize為0,maximumPoolSize為無(wú)限大,意味著線(xiàn)程數(shù)量可以無(wú)限大;
-
keepAliveTime為60S,意味著線(xiàn)程空閑時(shí)間超過(guò)60S就會(huì)被殺死;
-
采用SynchronousQueue裝等待的任務(wù),這個(gè)阻塞隊(duì)列沒(méi)有存儲(chǔ)空間,這意味著只要有請(qǐng)求到來(lái),就必須要找到一條工作線(xiàn)程處理他,如果當(dāng)前沒(méi)有空閑的線(xiàn)程,那么就會(huì)再創(chuàng)建一條新的線(xiàn)程。
?
| public?static?ExecutorService?newCachedThreadPool?() { ??return?new?ThreadPoolExecutor(0, Integer.?MAX_VALUE, ????????????????????????????????????? 60L, TimeUnit.?SECONDS, ??????????????????????????????????????new?SynchronousQueue<Runnable>()); } |
?
service.submit(Runnable);
service.shutdown();
---------------------------------
ScheduledExecutorService service=Executors.newScheduledThreadPool(5);//時(shí)間輪詢(xún),每隔多少時(shí)間執(zhí)行一個(gè)任務(wù),如果線(xiàn)程忙不過(guò)來(lái)會(huì)自動(dòng)新加線(xiàn)程(核心線(xiàn)程5,最大線(xiàn)程int最大值相當(dāng)于沒(méi)有上限)
-
它采用DelayQueue存儲(chǔ)等待的任務(wù)
-
DelayQueue內(nèi)部封裝了一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間排序,若time相同則根據(jù)sequenceNumber排序;
-
DelayQueue也是一個(gè)無(wú)界隊(duì)列;
-
工作線(xiàn)程的執(zhí)行過(guò)程:
-
工作線(xiàn)程會(huì)從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
-
執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue
?
| ? ???public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { ??????? return new ScheduledThreadPoolExecutor( corePoolSize); ? ? ?} ? ??public ScheduledThreadPoolExecutor( int corePoolSize) { ??????? super(corePoolSize, Integer. MAX_VALUE, 0, TimeUnit. NANOSECONDS, ????????????? new DelayedWorkQueue()); ? ? ?} ? ? ?public ThreadPoolExecutor( int corePoolSize, ????????????????????????????? int maximumPoolSize, ????????????????????????????? long keepAliveTime, ????????????????????????????? TimeUnit unit, ????????????????????????????? BlockingQueue<Runnable> workQueue) { ??????? this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, ???????????? Executors. defaultThreadFactory(), defaultHandler); ? ? ?} |
service.schedule(callable,delay,unit);//線(xiàn)程,延遲,時(shí)間單位 ? 每隔2s提交一次請(qǐng)求
?
| public class ReadWriteLock { ??????public static <E> void main(String[] args) { ???????????? ????????????ExecutorService service = Executors. newCachedThreadPool(); ???????????? ???????????? final ReadWrite readWrite = new ReadWrite(); ???????????? ???????????? try { ??????????????????Thread thread1 = new Thread( new Runnable() { ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? readWrite.writeLock( "this is write"); ?????????????????????????????? ????????????????????????} ??????????????????}, "AA"); ?????????????????? service.execute( thread1);????? ?????????????????? ?????????????????? for( int i=1; i<=100; i++){ ????????????????????????Thread thread2 = new Thread( new Runnable() { ?????????????????????????????? @Override ?????????????????????????????? public void run() { ???????????????????????????????????? readWrite.readLock(); ??????????????????????????????} ????????????????????????}, String. valueOf(i)); ???????????????????????? service.execute( thread2); ??????????????????} ????????????} catch (Exception e) { ?????????????????? // TODO Auto-generated catch block ?????????????????? e.printStackTrace(); ????????????} finally{ ?????????????????? service.shutdown(); ????????????} ??????} } |
?
?
8、常用工具類(lèi)
CountDownLatch----所有線(xiàn)程執(zhí)行完才執(zhí)行的任務(wù)(秦滅六國(guó),一統(tǒng)華夏)
final CountDownLatch countDownLatch = new CountDownLatch(6);
countDownLatch .countDown();//沒(méi)執(zhí)行一條就-1,直到6條都執(zhí)行完
countDownLatch .await();//阻塞最后一個(gè)要執(zhí)行的主線(xiàn)程
?
| public class CountDownLatchDemo { ?????? ??????public static void main(String[] args) throws InterruptedException { ???????????? //CountDownLatch ???????????? final CountDownLatch countDownLatch = new CountDownLatch(6); ???????????? ???????????? for( int i=1; i<=6; i++){ ?????????????????? new Thread( new Runnable() { ???????????????????????? ???????????????????????? @Override ???????????????????????? public void run() { ??????????????????????????????System. out.println(Thread. currentThread().getName()+"\t 國(guó)家被滅"); ?????????????????????????????? countDownLatch.countDown(); ????????????????????????} ??????????????????}, CountryEnums.forEachCountryEnums(i).getMsg()).start(); ????????????} ???????????? ???????????? countDownLatch.await(); ????????????System. out.println(Thread. currentThread().getName()+"\t 秦滅六國(guó),一統(tǒng)華夏" ); ??????} } |
?
?
CyclicBarrier---集齊7顆龍珠,可以召喚神龍(其他線(xiàn)程執(zhí)行完了只能等待)
?
| public class CyclicBarrierDemo { ?????? ??????private final static int number=7; ?????? ??????public static void main(String[] args) { ???????????? final CyclicBarrier cyclicBarrier = new CyclicBarrier(number, new Runnable() { ?????????????????? @Override ?????????????????? public void run() { ????????????????????????System. out.println( "集齊7顆龍珠,可以召喚神龍" ); ??????????????????} ????????????}); ???????????? for( int i=1; i<=7; i++){ ?????????????????? final int temp= i; ?????????????????? new Thread( new Runnable() { ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? try { ????????????????????????????????????System. out.println(Thread. currentThread().getName()+"\t 收集第"+ temp+ "顆龍珠"); ???????????????????????????????????? cyclicBarrier.await(); ??????????????????????????????} catch (InterruptedException | BrokenBarrierException e) { ???????????????????????????????????? e.printStackTrace(); ??????????????????????????????} ????????????????????????} ??????????????????}, String. valueOf(i)).start(); ????????????} ???????????? ??????} } |
?
Semaphore----信號(hào)燈(爭(zhēng)車(chē)位)
?
| public class SemaphoreDemo { ??????public static void main(String[] args) { ???????????? final Semaphore semaphore = new Semaphore(3); //模擬3個(gè)停車(chē)位 ???????????? for( int i=1; i<=6; i++){ //模擬6個(gè)汽車(chē) ?????????????????? new Thread( new Runnable() { ???????????????????????? ???????????????????????? @Override ???????????????????????? public void run() { ?????????????????????????????? try { ???????????????????????????????????? semaphore.acquire(); ????????????????????????????????????System. out.println(Thread. currentThread().getName()+ "\t 搶占到停車(chē)位" ); ????????????????????????????????????TimeUnit. SECONDS.sleep( new Random().nextInt(5)); ????????????????????????????????????System. out.println(Thread. currentThread().getName()+ "\t---- 離開(kāi)了停車(chē)位" ); ??????????????????????????????} catch (InterruptedException e) { ???????????????????????????????????? e.printStackTrace(); ??????????????????????????????} finally{ ???????????????????????????????????? semaphore.release(); ??????????????????????????????} ?????????????????????????????? ????????????????????????} ??????????????????}, String. valueOf(i)).start(); ????????????} ??????} } |
?
9、集合不安全類(lèi)
ArrayList、HashMap、HashSet
java.util.ConcurrentModificationException
CopyOnWriteArrayList();//寫(xiě)時(shí)復(fù)制
往元素中添加元素時(shí),先復(fù)制一份新的數(shù)組(Arrays.copyOf),長(zhǎng)度+1,把要添加的元素添加到新的數(shù)組中。最后把引用指向新的數(shù)組。(整個(gè)過(guò)程添加了reentrainLock)
CopyOnWriteArraySet<String>();
ConcurrentHashMap<>();
sss---->Arrays ?Collections
三者對(duì)比:
?
| 1)CountDownLatch和CyclicBarrier都能夠?qū)崿F(xiàn)線(xiàn)程之間的等待,只不過(guò)它們側(cè)重點(diǎn)不同: CountDownLatch一般用于某個(gè)線(xiàn)程A等待若干個(gè)其他線(xiàn)程執(zhí)行完任務(wù)之后,它才執(zhí)行; 而CyclicBarrier一般用于一組線(xiàn)程互相等待至某個(gè)狀態(tài),然后這一組線(xiàn)程再同時(shí)執(zhí)行; 另外,CountDownLatch是不能夠重用的,而CyclicBarrier是可以重用的。 2)Semaphore其實(shí)和鎖有點(diǎn)類(lèi)似,它一般用于控制對(duì)某組資源的訪問(wèn)權(quán)限。 |
?
?
10、volatile
內(nèi)存可見(jiàn)性?
?
| public class SafeSingletonDemo { ??????private static volatile SafeSingletonDemo safeSingletonDemo = null; ??????private SafeSingletonDemo(){ ????????????System. out.println( "********"+Thread. currentThread().getName()); ??????} ?????? ??????//double check lock ??????public static SafeSingletonDemo getInstance(){ ???????????? if( null == safeSingletonDemo){ ?????????????????? synchronized (SafeSingletonDemo. class) { ???????????????????????? if( null == safeSingletonDemo){ ?????????????????????????????? safeSingletonDemo= new SafeSingletonDemo(); ????????????????????????} ??????????????????} ????????????} ???????????? return safeSingletonDemo; ??????} ?????? ??????public static void main(String[] args) { ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "AA").start(); ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "BB").start(); ???????????? new Thread( new Runnable() { ?????????????????? ?????????????????? @Override ?????????????????? public void run() { ????????????????????????SafeSingletonDemo. getInstance(); ??????????????????} ????????????}, "CC").start(); ??????} } |
彩蛋:某次公開(kāi)課中記錄的筆記
擊穿緩存的方法:
口訣:讀多寫(xiě)少用緩存,寫(xiě)多讀少用隊(duì)列,限流、分流
方法1:化并發(fā)為同步 ??
lock:等待鎖:粗粒度的鎖
1個(gè)線(xiàn)程拿到鎖,重建緩存,
其他1999個(gè)線(xiàn)程等待,從redis取
方法2:互斥鎖ConcurrentHashMap<> map ? ? 細(xì)粒度的鎖
車(chē)次號(hào)1-->是否有鎖
車(chē)次號(hào)2-->是否有鎖
boolean lock = false;
lock = map.putIfAbsent(key,value)==null;//代表當(dāng)前沒(méi)有數(shù)據(jù),不為null,當(dāng)前有數(shù)據(jù)
if(lock){//拿到鎖
? ? ?//重建緩存
? ? ?//再查一次
}else{//沒(méi)拿到鎖
? ? ?//緩存降級(jí)
? ? ?1:提示:當(dāng)前人數(shù)太多,請(qǐng)耐心等待
}
?
總結(jié)
- 上一篇: python变量/分支/循环/数组/列表
- 下一篇: 分布式架构演进过程