java类同步,Java同步工具類(一)
同步工具類可以是任意一個(gè)對象,只要它可以根據(jù)自身的狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊(duì)列可以作為同步工具類,其他類型的同步工具類還包括信號(hào)量(Semaphore)、柵欄(Barrier)以及閉鎖。在平臺(tái)類庫中還包含一些其他同步工具類,如果還是不能滿足需要,我們可以創(chuàng)建自己的同步工具類。
一、閉鎖
閉鎖可以延遲線程的進(jìn)度直到其達(dá)到終止?fàn)顟B(tài)。閉鎖可以用來確保某些活動(dòng)直到其他活動(dòng)都完成后才繼續(xù)執(zhí)行。例如:
某個(gè)計(jì)算在其需要的資源都初始化完成之后執(zhí)行;
某個(gè)服務(wù)在其所有依賴的服務(wù)都啟動(dòng)之后才啟動(dòng);
游戲中所有的玩家都就緒才繼續(xù)執(zhí)行。
CountDownLatch是一種靈活的閉鎖實(shí)現(xiàn),它可以使一個(gè)或多個(gè)線程等待一組時(shí)間發(fā)生。閉鎖狀態(tài)包括一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器初始化為一個(gè)正數(shù),表示需要等待的事件數(shù)量,countDown方法表示遞減計(jì)數(shù)器,表示一個(gè)事件發(fā)生了,而await方法等待直到計(jì)數(shù)器為0,表示所有事件都已經(jīng)發(fā)生。如果計(jì)數(shù)器的值非零,那么就會(huì)一直等待下去,或者等待中被打斷,或者超時(shí)。
例子(計(jì)算所有線程運(yùn)行時(shí)間):package thread.semaphore;
import java.util.concurrent.CountDownLatch;
public class TestHarness {
public static long timeTask(int nThreads, final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i=0; i
Thread thread = new Thread() {
@Override
public void run() {
try {
startGate.await();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
endGate.countDown();
}
}
};
thread.start();
}
long start = System.currentTimeMillis();
startGate.countDown();
endGate.await();
long end = System.currentTimeMillis();
return end-start;
}
public static void main(String[] args) throws InterruptedException {
long time = timeTask(5, new Runnable() {
@Override
public void run() {
try {
Thread.sleep((int)(500 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("all task use " + time + "ms");
}
}
上例中使用了兩個(gè)閉鎖,一個(gè)起始門(startGate),一個(gè)結(jié)束門(endGate)。起始門的計(jì)數(shù)器值初始化為1,結(jié)束門是線程數(shù),每個(gè)線程首先要做的就是在起始門上等待多有的線程都就緒后才開始執(zhí)行。而每個(gè)線程最后要做的事就是調(diào)用結(jié)束門countDown方法減1,這能高效的等待所有的線程都工作完成,這樣可以統(tǒng)計(jì)消耗的時(shí)間。
其他方法
如果有某個(gè)線程處理的比較慢,我們不可能讓主線程一直等待,所以我們可以使用另外一個(gè)帶指定時(shí)間的await方法,await(long time, TimeUnit unit): 這個(gè)方法等待特定時(shí)間后,就會(huì)不再阻塞當(dāng)前線程。join也有類似的方法。
注意:計(jì)數(shù)器必須大於等於0,只是等於0時(shí)候,計(jì)數(shù)器就是零,調(diào)用await方法時(shí)不會(huì)阻塞當(dāng)前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內(nèi)部計(jì)數(shù)器的值。
二、信號(hào)量
計(jì)數(shù)信號(hào)量(Counting Semaphore)用來控制同時(shí)訪問某個(gè)特定資源的操作數(shù)量。計(jì)數(shù)信號(hào)量還可以用來實(shí)現(xiàn)某種資源池(如:數(shù)據(jù)庫連接池),或者對容器施加邊界。
Semaphore中管理著一組虛擬的許可(permit),許可的數(shù)量可以通過構(gòu)造器來指定。在執(zhí)行操作時(shí)可以先獲取許可(只要還有剩余的許可),並在使用之后釋放許可。如果沒有許可,那么aquire將阻塞指定獲取許可(或者直到被中斷或者操作超時(shí))。release將返回一個(gè)許可給信號(hào)量。計(jì)算信號(hào)量的一種簡化形式就是二值信號(hào)量,即初始值為1的Semaphore。二值信號(hào)量可以用作互斥體(mutex),並具備不可重入的語義:誰擁有這個(gè)唯一的許可,誰就擁有了互斥鎖。
例子(流量控制):
要讀取幾萬個(gè)文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù),我們可以啟動(dòng)幾十個(gè)線程並發(fā)的讀取,但是如果讀到內(nèi)存后,還需要存儲(chǔ)到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個(gè),這時(shí)我們必須控制只有十個(gè)線程同時(shí)獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫連接。public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
其他方法:
int availablePermits() :返回此信號(hào)量中當(dāng)前可用的許可證數(shù)。
int getQueueLength():返回正在等待獲取許可證的線程數(shù)。
boolean hasQueuedThreads() :是否有線程正在等待獲取許可證。
void reducePermits(int reduction) :減少reduction個(gè)許可證。是個(gè)protected方法。
Collection getQueuedThreads() :返回所有等待獲取許可證的線程集合。是個(gè)protected方法。
三、柵欄(同步屏障)
1. CyclicBarrier
柵欄(Barrier)類似於閉鎖,它能阻塞一組線程直到某個(gè)時(shí)間發(fā)生。柵欄和閉鎖的區(qū)別在於,所有線程必須都到達(dá)柵欄位置之后才能繼續(xù)執(zhí)行。閉鎖用於等待事件,二柵欄用於等待其他線程。
閉鎖是一次性操作,一旦進(jìn)入終止?fàn)顟B(tài)就不能重置。CyclicBarrier可以使一定數(shù)量的線程反復(fù)在柵欄位置匯集,它在並行迭代算法中非常有用:這種算法通常將一個(gè)問題拆分成多個(gè)互不相關(guān)的子問題。當(dāng)線程執(zhí)行到柵欄位置時(shí)將調(diào)用await方法等待其他線程,這個(gè)方法阻塞直到所有線程都到達(dá)柵欄位置。當(dāng)所有線程都到達(dá)柵欄位置,那么柵欄打開,所有線程都被釋放。而柵欄將被重置以便下次使用。如果await被中斷或者超時(shí),那么柵欄被認(rèn)為是打破了,所有線程的await都將被終止並拋出BrokenBarrierException。如果成功的通過柵欄,那么await將為每個(gè)線程返回一個(gè)唯一的到達(dá)索引號(hào),我們可以利用這些索引來“選舉”產(chǎn)生一個(gè)領(lǐng)導(dǎo)線程,並在下一次迭代中由該領(lǐng)導(dǎo)線程執(zhí)行一些特殊的工作。
CyclicBarrier還可以利用構(gòu)造函數(shù)傳遞一個(gè)Runnable,當(dāng)成功通過柵欄時(shí)會(huì)執(zhí)行它,但在阻塞線程被釋放前不會(huì)執(zhí)行。
例子(10個(gè)人去春游,規(guī)定達(dá)到一個(gè)地點(diǎn)后才能繼續(xù)前行)package thread.semaphore;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierWorker implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;
public CyclicBarrierWorker(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(id + " th people wait, waitings " + cyclicBarrier.getNumberWaiting());
int returnIndex = cyclicBarrier.await();// 大家等待最后一個(gè)線程到達(dá)
System.out.println(id + " th people go, returnIndex:" + returnIndex);
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
int num = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num, new Runnable() {
@Override
public void run() {
System.out.println("go on together!");
}
});
for (int i=1; i<=num; i++) {
new Thread(new CyclicBarrierWorker(i, cyclicBarrier)).start();
}
}
}
運(yùn)行結(jié)果:2 th people wait, waitings 1
4 th people wait, waitings 1
3 th people wait, waitings 3
5 th people wait, waitings 4
6 th people wait, waitings 5
7 th people wait, waitings 6
8 th people wait, waitings 6
9 th people wait, waitings 7
10 th people wait, waitings 9
go on together!
10 th people go, returnIndex:0
2 th people go, returnIndex:8
4 th people go, returnIndex:7
3 th people go, returnIndex:6
1 th people go, returnIndex:9
7 th people go, returnIndex:3
9 th people go, returnIndex:1
6 th people go, returnIndex:4
5 th people go, returnIndex:5
8 th people go, returnIndex:2
2.Exchanger(兩個(gè)線程進(jìn)行數(shù)據(jù)交換)
另一種柵欄是Exchanger,它是一種兩方(two-party)柵欄,各方在柵欄位置互換數(shù)據(jù)。當(dāng)兩方執(zhí)行不對稱操作時(shí)Exchanger會(huì)非常有用,例如一個(gè)線程向緩存中寫數(shù)據(jù),另一線程讀數(shù)據(jù),這兩個(gè)線程可以使用Exchanger匯合,並將滿的緩沖區(qū)和空的緩沖區(qū)互換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過exchange方法交換數(shù)據(jù), 如果第一個(gè)線程先執(zhí)行exchange方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。
例子(校對工作):public class ExchangerTest {
private static final Exchanger exgr = new Exchanger();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "銀行流水A";// A錄入銀行流水?dāng)?shù)據(jù)
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "銀行流水B";// B錄入銀行流水?dāng)?shù)據(jù)
String A = exgr.exchange("B");
System.out.println("A和B數(shù)據(jù)是否一致:" + A.equals(B) + ",A錄入的是:"
+ A + ",B錄入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
運(yùn)行結(jié)果:A和B數(shù)據(jù)是否一致:false,A錄入的是:銀行流水A,B錄入是:銀行流水B
其他方法
如果兩個(gè)線程有一個(gè)沒有到達(dá)exchange方法,則會(huì)一直等待,如果擔(dān)心有特殊情況發(fā)生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)設(shè)置最大等待時(shí)長。
參考:《java並發(fā)編程實(shí)戰(zhàn)》
並發(fā)編程網(wǎng)
總結(jié)
以上是生活随笔為你收集整理的java类同步,Java同步工具類(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中科院的matlab课件,中科院的mat
- 下一篇: mysql表一对多关系,mysql表与表