java线程同步的实现_Java并发编程(三) - 实战:线程同步的实现
synchronized關鍵字
首先,來看一個多線程競爭臨界資源導致的同步不安全問題。
package com.example.weishj.mytester.concurrency.sync;
/**
* 同步安全測試
*
* 在無任何同步措施時,并發會導致錯誤的結果
*/
public class SyncTest1 implements Runnable {
// 共享資源(臨界資源)
private static int race = 0;
private static final int THREADS_COUNT = 10;
public void increase() {
race++;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
increase();
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
SyncTest1 runnable = new SyncTest1();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(runnable);
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 期待的結果應該是(THREADS_COUNT * 10000)= 100000
System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
}
}
運行結果:
race = 69309, time: 4
synchronized實例方法
鎖定實例對象(this)
以開頭的代碼為例,對 increase() 做同步安全控制:
// synchronized實例方法,安全訪問臨界資源
public synchronized void increase() {
race++;
}
運行結果:
race = 100000, time: 29
既然鎖定的是this對象,那么任何同步安全就必須建立在當前對象鎖的前提之上,脫離了當前對象,就不再有同步安全可言。仍然以開頭的代碼為例:
package com.example.weishj.mytester.concurrency.sync;
/**
* 同步安全測試
*
* 脫離了"同一個對象"的前提,synchronized實例方法將不再具有同步安全性
*/
public class SyncTest3 implements Runnable {
// 共享資源(臨界資源)
private static int race = 0;
private static final int THREADS_COUNT = 10;
// synchronized實例方法,安全訪問臨界資源
public synchronized void increase() {
race++;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
increase();
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
// SyncTest3 runnable = new SyncTest3();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 不同的對象鎖,將導致臨界資源不再安全
threads[i] = new Thread(new SyncTest3());
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 期待的結果應該是(THREADS_COUNT * 10000)= 100000
System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
}
}
運行結果:
race = 72446, time: 5
因此,使用synchronized實例方法時,需要格外注意實例對象是不是同一個:
單例:安全
非單例:同一個實例對象上才存在同步安全
另外,既然是針對對象加鎖,那么同一個對象中的多個同步實例方法之間,也是互斥的。
package com.example.weishj.mytester.concurrency.sync;
/**
* 同步安全測試
*
* 同一個對象的不同synchronized實例方法之間,也是互斥的
*/
public class SyncTest4 {
private static final int THREADS_COUNT = 2;
public synchronized void a() {
int i = 5;
while (i-- > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", method: a, running...");
}
}
public synchronized void b() {
int i = 5;
while (i-- > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", method: b, running...");
}
}
public static void main(String[] args) {
final SyncTest4 instance = new SyncTest4();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
final int finalI = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
if (finalI % 2 == 0) {
// 若通過不同對象調用方法ab,則ab之間不存在互斥關系
// new SyncTest4().a();
// 在同一個對象上調用方法ab,則ab之間是互斥的
instance.a();
} else {
// 若通過不同對象調用方法ab,則ab之間不存在互斥關系
// new SyncTest4().b();
// 在同一個對象上調用方法ab,則ab之間是互斥的
instance.b();
}
}
});
threads[i].start();
}
}
}
運行結果:
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
若兩個線程分別通過不同的對象調用方法ab(上述示例中被注釋的代碼),則ab之間就不存在互斥關系。可以通過上述示例中被注釋的代碼來驗證,運行結果:
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
綜上分析,synchronized實例方法 有以下關鍵點需要記住:
鎖定實例對象(this)
每個實例都有獨立的對象鎖,因此只有針對同一個實例,才具備互斥性
同一個實例中的多個synchronized實例方法之間,也是互斥的
synchronized靜態方法
鎖定類對象(class)
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
/**
* 同步安全測試
*
* 同步靜態方法,實現線程安全
*/
public class SyncStaticTest1 implements Runnable {
// 共享資源(臨界資源)
private static int race = 0;
private static final int THREADS_COUNT = 10;
public static synchronized void increase() {
race++;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
// 這里加this只是為了顯式地表明是通過對象來調用increase方法
this.increase();
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 每次都創建新的SyncStaticTest1實例
threads[i] = new Thread(new SyncStaticTest1());
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 期待的結果應該是(THREADS_COUNT * 10000)= 100000
System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
}
}
運行結果:
race = 100000, time: 25
可見,就算是10個線程分別通過不同的SyncStaticTest1實例調用increase方法,仍然是線程安全的。同樣地,不同線程分別通過實例對象和類對象調用同步靜態方法,也是線程安全的,這里不再做演示。
但是,同一個類的 同步靜態方法 和 同步實例方法 之間,則不存在互斥性,因為他們的同步鎖不同。如下示例:
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
/**
* 同步安全測試
*
* 同步靜態方法和同步實例方法之間,不存在互斥性
*/
public class SyncStaticTest2 {
private static final int THREADS_COUNT = 2;
public synchronized static void a() {
int i = 5;
while (i-- > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", method: a, running...");
}
}
public synchronized void b() {
int i = 5;
while (i-- > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", method: b, running...");
}
}
public static void main(String[] args) {
final SyncStaticTest2 instance = new SyncStaticTest2();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
final int finalI = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
if (finalI % 2 == 0) {
// 靜態方法即可以通過實例調用,也可以通過類調用
instance.a();
} else {
// 實例方法則只能通過實例調用
instance.b();
}
}
});
threads[i].start();
}
}
}
運行結果:
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-0, method: a, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
Thread: Thread-1, method: b, running...
綜上分析,synchronized靜態方法 有以下關鍵點需要記住:
鎖定類對象(class)
同步靜態方法在任意實例對象之間,也是互斥的
同個類的同步靜態方法和同步實例方法之間,不具備互斥性
synchronized代碼塊
從之前的演示示例中,我們可以發現,方法同步后,其耗時(time)一般都在20ms以上,而不同步時,time則只有3ms左右,這印證了synchronized關鍵字其實是非常低效的,不應該隨意使用,如果必須使用,也應該考慮盡量減少同步的范圍,尤其當方法體比較大時,應該盡量避免使用同步方法,此時可以考慮用同步代碼塊來代替。
synchronized(obj) {...}
鎖住指定的對象(可以是任意實例對象,類對象)
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
/**
* 同步安全測試
*
* 同步代碼塊,實現線程安全
*/
public class SyncBlockTest1 implements Runnable {
// 共享資源(臨界資源)
private static int race = 0;
private static final int THREADS_COUNT = 10;
public void increase() {
race++;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
// 要注意這里鎖定的對象是誰
synchronized (this) {
increase();
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
SyncBlockTest1 runnable = new SyncBlockTest1();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 必須使用同一個實例,才能達到同步效果
threads[i] = new Thread(runnable);
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 期待的結果應該是(THREADS_COUNT * 10000)= 100000
System.out.println("race = " + race + ", time: " + (System.currentTimeMillis() - start));
}
}
運行結果:
race = 100000, time: 29
上例中,我們鎖定了當前對象 this ,如果類的使用情況比較復雜,無法用this做對象鎖,也可以自行創建任意對象充當對象鎖,此時建議使用長度為0的byte數組,因為在所有對象中,它的創建是最經濟的(查看編譯后的字節碼:byte[] lock = new byte[0] 只需3條操作碼,而Object lock = new Object() 則需要7行操作碼)。
// 使用一個長度為0的byte數組作為對象鎖
private byte[] lock = new byte[0];
synchronized (lock) {
increase();
}
使用同步代碼塊時,同樣必須明確你的對象鎖是誰,這樣才能寫出正確的使用邏輯。以上例來說,無論是 this 還是 lock ,他們都是與當前對象相關的,所以,為了達到同步效果,必須如下使用:
SyncBlockTest1 runnable = new SyncBlockTest1();
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 必須使用同一個實例,才能達到同步效果
threads[i] = new Thread(runnable);
threads[i].start();
}
可如果你的使用方法如下,就失去了線程安全性:
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 每次都創建新的SyncStaticTest1實例,就會失去線程安全性
threads[i] = new Thread(new SyncBlockTest1());
threads[i].start();
}
此時,運行結果為:
race = 62629, time: 7
但如果你鎖定的是類對象 SyncStaticTest1.class ,那10個線程無論使用同一個實例還是各自使用不同的實例,都是安全的。
// 鎖定類對象
synchronized (SyncStaticTest1.class) {
increase();
}
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
// 每次都創建新的SyncStaticTest1實例,仍然是線程安全的
threads[i] = new Thread(new SyncBlockTest1());
threads[i].start();
}
運行結果:
race = 100000, time: 25
綜上分析,synchronized代碼塊 有以下關鍵點需要記住:
鎖住指定的對象(可以是任意實例對象,類對象)
需要創建對象鎖時,建議使用 new byte[0] ,因為在所有對象中,它的創建是最經濟的
必須時刻明確對象鎖是誰,只有配合正確的使用方法,才能得到正確的同步效果
至此,synchronized的三種用法就說完了,可見,使用synchronized時,明確對象鎖是非常重要的。另外,搞清楚了對象鎖的相關知識后,就不難推斷出以下2個等式:
synchronized void method() {
// method logic
}
等價于:
void method() {
synchronized(this) {
// method logic
}
}
static synchronized void method() {
// method logic
}
等價于:
static void method() {
synchronized(TestClass.class) {
// method logic
}
}
Lock接口
除了synchronized關鍵字,JDK1.5中還新增了另外一種線程同步機制:Lock接口。來看看其接口定義:
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
lock()
獲取普通鎖,若鎖已被獲取,則只能等待,效果與synchronized相同。只不過lock后需要unlock。
lockInterruptibly()
獲取可中斷鎖,當兩個線程同時通過 lockInterruptibly() 想獲取某個鎖時,假設A獲取到了,那么B只能等待,此時如果對B調用 interrupt() 方法,就可以中斷B的等待狀態。但是注意,A是不會被 interrupt() 中斷的,也就是說,只有處于等待狀態的線程,才可以響應中斷。
tryLock()
嘗試獲取鎖,如果獲取成功返回true,反之立即返回false。此方法不會阻塞等待獲取鎖。
tryLock(long time, TimeUnit unit)
等待time時間,如果在time時間內獲取到鎖返回true,如果阻塞等待time時間內沒有獲取到鎖返回false。
unlock()
業務處理完畢,釋放鎖。
newCondition()
創建一個Condition。Condition與Lock結合使用,可以達到synchronized與wait/notify/notifyAll結合使用時同樣的線程等待與喚醒的效果,而且功能更強大。
Lock接口與synchronized關鍵字的區別
synchronized加解鎖是自動的;而Lock需要手動加解鎖,操作復雜,但更加靈活
lock與unlock需要成對使用,否則可能造成線程長期占有鎖,其他線程長期等待
unlock應該放在 finally 中,以防發生異常時未能及時釋放鎖
synchronized不可響應中斷,一個線程獲取不到鎖就一直等待;而Lock可以響應中斷
當兩個線程同時通過 Lock.lockInterruptibly() 想獲取某個鎖時,假設A獲取到了,那么B只能等待,此時如果對B調用 interrupt() 方法,就可以中斷B的等待狀態。但是注意,A是不會被 interrupt() 中斷的,也就是說,只有處于等待狀態的線程,才可以響應中斷。
synchronized無法實現公平鎖;而Lock可以實現公平鎖
公平鎖與非公平鎖的概念稍后再說
ReentrantLock可重入鎖
ReentrantLock是Lock的實現類。首先,看一個簡單的售票程序:
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
/**
* 同步安全測試
*
* 一個簡單的售票程序,多線程同時售票時,會出現線程安全問題
*/
public class ReentrantLockTest1 {
private static final int THREADS_COUNT = 3; // 線程數
private static final int TICKETS_PER_THREAD = 5; // 每個線程分配到的票數
// 共享資源(臨界資源)
private int ticket = THREADS_COUNT * TICKETS_PER_THREAD; // 總票數
public void buyTicket() {
try {
if (ticket > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
// 為了更容易出現安全問題,這里加一個短暫睡眠
Thread.sleep(2);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
public void readTicket() {
System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
final ReentrantLockTest1 instance = new ReentrantLockTest1();
// 啟動 THREADS_COUNT 個線程
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
// 每個線程可以賣 TICKETS_PER_THREAD 張票
for (int j = 0; j < TICKETS_PER_THREAD; j++) {
instance.buyTicket();
}
}
});
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 讀取剩余票數
instance.readTicket();
// 耗時
System.out.println("time: " + (System.currentTimeMillis() - start));
}
}
庫存有15張票,同時啟動3個線程出售,每個線程分配5張,線程安全時,結果應該是所有票正好都被賣掉,不多不少。然而,在沒有任何同步措施的情況下,運行結果如下:
Thread: Thread-0, bought ticket-15
Thread: Thread-2, bought ticket-13
Thread: Thread-1, bought ticket-14
Thread: Thread-1, bought ticket-12
Thread: Thread-2, bought ticket-11
Thread: Thread-0, bought ticket-12
Thread: Thread-2, bought ticket-10
Thread: Thread-1, bought ticket-10
Thread: Thread-0, bought ticket-9
Thread: Thread-2, bought ticket-8
Thread: Thread-1, bought ticket-7
Thread: Thread-0, bought ticket-6
Thread: Thread-0, bought ticket-5
Thread: Thread-2, bought ticket-5
Thread: Thread-1, bought ticket-4
Thread: main, tickets left: 3
time: 14
可見,ticket-12、ticket-10、ticket-5均被售出了2次,而Ticket-1、Ticket-2、Ticket-3沒有售出。
下面是使用Lock的實現類 ReentrantLock 對上例做的改造:
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
import java.util.concurrent.locks.ReentrantLock;
/**
* 同步安全測試
*
* 演示ReentrantLock實現同步,以及公平鎖與非公平鎖
*/
public class ReentrantLockTest2 {
private static final int THREADS_COUNT = 3; // 線程數
private static final int TICKETS_PER_THREAD = 5; // 每個線程分配到的票數
// 共享資源(臨界資源)
private int ticket = THREADS_COUNT * TICKETS_PER_THREAD; // 總票數
private static final ReentrantLock lock;
static {
// 創建一個公平鎖/非公平鎖
lock = new ReentrantLock(false); // 修改參數,看看公平鎖與非公平鎖的差別
}
public void buyTicket() {
try {
lock.lock();
if (ticket > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
// 為了演示出公平鎖與非公平鎖的效果,這里加一個短暫睡眠,讓其他線程獲得一個等待時間
Thread.sleep(2);
}
} catch (Throwable t) {
t.printStackTrace();
} finally {
// unlock應該放在finally中,防止發生異常時來不及解鎖
lock.unlock();
}
}
public void readTicket() {
System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
final ReentrantLockTest2 instance = new ReentrantLockTest2();
// 啟動 THREADS_COUNT 個線程
Thread[] threads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
// 每個線程可以賣 TICKETS_PER_THREAD 張票
for (int j = 0; j < TICKETS_PER_THREAD; j++) {
instance.buyTicket();
}
}
});
threads[i].start();
}
// 等待所有累加線程都結束
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 讀取剩余票數
instance.readTicket();
// 耗時
System.out.println("time: " + (System.currentTimeMillis() - start));
}
}
運行結果:
Thread: Thread-0, bought ticket-15
Thread: Thread-0, bought ticket-14
Thread: Thread-0, bought ticket-13
Thread: Thread-1, bought ticket-12
Thread: Thread-1, bought ticket-11
Thread: Thread-1, bought ticket-10
Thread: Thread-1, bought ticket-9
Thread: Thread-1, bought ticket-8
Thread: Thread-2, bought ticket-7
Thread: Thread-2, bought ticket-6
Thread: Thread-2, bought ticket-5
Thread: Thread-2, bought ticket-4
Thread: Thread-2, bought ticket-3
Thread: Thread-0, bought ticket-2
Thread: Thread-0, bought ticket-1
Thread: main, tickets left: 0
time: 36
可見,從 ticket-15 到 ticket-1 都被按順序售出了,只不過每張票由哪條線程售出則存在不確定性。上述運行結果是使用 非公平鎖 得到的,我們再通過修改代碼 lock = new ReentrantLock(true) ,看看公平鎖的運行效果:
Thread: Thread-0, bought ticket-15
Thread: Thread-1, bought ticket-14
Thread: Thread-2, bought ticket-13
Thread: Thread-0, bought ticket-12
Thread: Thread-1, bought ticket-11
Thread: Thread-2, bought ticket-10
Thread: Thread-0, bought ticket-9
Thread: Thread-1, bought ticket-8
Thread: Thread-2, bought ticket-7
Thread: Thread-0, bought ticket-6
Thread: Thread-1, bought ticket-5
Thread: Thread-2, bought ticket-4
Thread: Thread-0, bought ticket-3
Thread: Thread-1, bought ticket-2
Thread: Thread-2, bought ticket-1
Thread: main, tickets left: 0
time: 47
我們看到,在公平鎖環境下,不僅ticket安全性得到保證,就連線程獲得鎖的順序也得到了保證,以“Thread-0、1、2”的順序循環執行。這里的“公平性”體現在哪里呢?通俗點說,就是先排隊等待(也就是等待時間越長)的線程先得到鎖,顯然,這種”先到先得“的效果,用隊列”先進先出“的特性實現最為合適。
Java也確實是通過”等待隊列“來實現”公平鎖“的。所有等待鎖的線程都會被掛起并且進入等待隊列,當鎖被釋放后,系統只允許等待隊列的頭部線程被喚醒并獲得鎖。而”非公平鎖“其實同樣有這樣一個隊列,只不過當鎖被釋放后,系統并不會只從等待隊列中獲取頭部線程,而是如果發現此時正好有一個還沒進入等待隊列的線程想要獲取鎖(此時該線程還未被掛起)時,則直接將鎖給了它(公平性被打破),這條線程就可以直接執行,而不用進行狀態切換,于是就省去了切換的開銷,這也就是非公平鎖效率高于公平鎖的原因所在。
有了上述理解,我們就可以推斷
若在釋放鎖時,總是沒有新的線程來打擾,則每次都必定從等待隊列中取頭部線程喚醒,此時非公平鎖等于公平鎖。
對于非公平鎖來說,只要線程進入了等待隊列,隊列里面仍然是FIFO的原則,跟公平鎖的順序是一樣的。有人認為,”非公平鎖環境下,哪條線程獲得鎖完全是隨機的“,這種說法明顯是不對的,已經進入等待隊列中的那些線程就不是隨機獲得鎖的。
Condition條件
在Lock接口定義中,還定義了一個 newCondition() 方法,用于返回一個Condition。
Condition與Lock結合起來使用,可以達到Object監視器方法(wait/notify/notifyAll)與synchronized結合起來使用時同樣甚至更加強大的線程等待與喚醒效果。其中,Lock替代synchronized,Condition替代Object監視器方法。
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。傳統的線程間通信方式,Condition都能實現,需要注意的是,Condition是綁定在Lock上的,必須通過Lock對象的 newCondition() 方法獲得。
Condition的強大之處,在于它可以針對同一個lock對象,創建多個不同的Condition條件,以處理復雜的線程等待與喚醒場景。典型的例子就是“生產者-消費者”問題。生產者與消費者共用同一個固定大小的緩沖區,當緩沖區滿了,生產者還想向其中添加數據時,就必須休眠,等待消費者取走一個或多個數據后再喚醒。同樣,當緩沖區空了,消費者還想從中取走數據時,也要休眠,等待生產者向其中添加一個或多個數據后再喚醒。可見,Condition可以指定哪條線程被喚醒,而notify/notifyAll則不行。
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Condition測試
*
* 生產者-消費者問題
*/
public class ConditionTest {
private static final int REPOSITORY_SIZE = 3;
private static final int PRODUCT_COUNT = 10;
public static void main(String[] args) {
// 創建一個容量為REPOSITORY_SIZE的倉庫
final Repository repository = new Repository(REPOSITORY_SIZE);
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < PRODUCT_COUNT; i++) {
try {
repository.put(Integer.valueOf(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}) ;
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < PRODUCT_COUNT; i++) {
try {
Object val = repository.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}) ;
producer.start();
consumer.start();
}
/**
* Repository 是一個定長集合,當集合為空時,take方法需要等待,直到有元素時才返回元素
* 當其中的元素數達到最大值時,put方法需要等待,直到元素被take之后才能繼續put
*/
static class Repository {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items;
int putIndex, takeIndex, count;
public Repository(int size) {
items = new Object[size];
}
public void put(Object x) throws InterruptedException {
try {
lock.lock();
while (count == items.length) {
System.out.println("Buffer full, please wait");
// 開始等待庫存不為滿
notFull.await();
}
// 生產一個產品
items[putIndex] = x;
// 增加當前庫存量
count++;
System.out.println("Produce: " + x);
if (++putIndex == items.length) {
putIndex = 0;
}
// 通知消費者線程庫存已經不為空了
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
try {
lock.lock();
while (count == 0) {
System.out.println("No element, please wait");
// 開始等待庫存不為空
notEmpty.await();
}
// 消費一個產品
Object x = items[takeIndex];
// 減少當前庫存量
count--;
System.out.println("Consume: " + x);
if (++takeIndex == items.length) {
takeIndex = 0;
}
// 通知生產者線程庫存已經不為滿了
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
}
運行結果:
Produce: 0
Produce: 1
Produce: 2
Buffer full, please wait
Consume: 0
Consume: 1
Produce: 3
Produce: 4
Buffer full, please wait
Consume: 2
Consume: 3
Consume: 4
No element, please wait
Produce: 5
Produce: 6
Produce: 7
Buffer full, please wait
Consume: 5
Consume: 6
Consume: 7
No element, please wait
Produce: 8
Produce: 9
Consume: 8
Consume: 9
ReadWriteLock讀寫鎖
ReadWriteLock也是一個接口,其優勢是允許”讀并發“,也就是”讀寫互斥,寫寫互斥,讀讀不互斥“。在多線程讀的場景下,能極大的提高運算效率,提升服務器吞吐量。其接口定義很簡單:
package java.util.concurrent.locks;
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
ReentrantReadWriteLock可重入讀寫鎖
ReentrantReadWriteLock是讀寫鎖的實現類。我們將售票程序做個簡單的改造:
package com.example.weishj.mytester.concurrency.sync.synchronizedtest;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 同步安全測試
*
* 演示ReentrantReadWriteLock實現同步,它的特點是"讀并發"、"寫互斥"、"讀寫互斥"
*/
public class ReentrantReadWriteLockTest1 {
private static final int THREADS_COUNT = 3; // 線程數
private static final int TICKETS_PER_THREAD = 4; // 每個線程分配到的票數
// 共享資源(臨界資源)
private int ticket = THREADS_COUNT * TICKETS_PER_THREAD; // 總票數
private static final ReadWriteLock lock;
static {
// 為了通過一個示例同時演示"讀并發"、"寫互斥"、"讀寫互斥"的效果,創建一個公平鎖
lock = new ReentrantReadWriteLock(false); // 此處也說明讀鎖與寫鎖之間同樣遵守公平性原則
}
public void buyTicket() {
try {
lock.writeLock().lock();
if (ticket > 0) {
System.out.println("Thread: " + Thread.currentThread().getName() + ", bought ticket-" + ticket--);
Thread.sleep(2);
}
} catch (Throwable t) {
t.printStackTrace();
} finally {
System.out.println("Thread: " + Thread.currentThread().getName() + ", unlocked write");
lock.writeLock().unlock();
}
}
public void readTicket() {
try {
lock.readLock().lock();
System.out.println("Thread: " + Thread.currentThread().getName() + ", tickets left: " + ticket);
Thread.sleep(5);
} catch (Throwable t) {
t.printStackTrace();
} finally {
System.out.println("Thread: " + Thread.currentThread().getName() + ", unlocked read");
lock.readLock().unlock();
}
}
public static void main(String[] args) {
final ReentrantReadWriteLockTest1 instance = new ReentrantReadWriteLockTest1();
// 啟動 THREADS_COUNT 個線程
Thread[] writeThreads = new Thread[THREADS_COUNT];
for (int i = 0; i < THREADS_COUNT; i++) {
writeThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
// 每個線程可以賣 TICKETS_PER_THREAD 張票
for (int j = 0; j < TICKETS_PER_THREAD; j++) {
instance.buyTicket();
}
}
});
writeThreads[i].start();
}
// 讀取此時的剩余票數
Thread[] readThreads = new Thread[2];
for (int i = 0; i < 2; i++) {
readThreads[i] = new Thread(new Runnable() {
@Override
public void run() {
// 每個線程可以讀 2 次剩余票數
for (int j = 0; j < 2; j++) {
instance.readTicket();
}
}
});
readThreads[i].start();
}
}
}
運行結果:
Thread: Thread-0, bought ticket-12
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-11
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-10
Thread: Thread-0, unlocked write
Thread: Thread-0, bought ticket-9
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-8
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-7
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-6
Thread: Thread-1, unlocked write
Thread: Thread-1, bought ticket-5
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-4
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-3
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-2
Thread: Thread-2, unlocked write
Thread: Thread-2, bought ticket-1
Thread: Thread-2, unlocked write
Thread: Thread-3, tickets left: 0
Thread: Thread-4, tickets left: 0
Thread: Thread-3, unlocked read
Thread: Thread-3, tickets left: 0
Thread: Thread-4, unlocked read
Thread: Thread-4, tickets left: 0
Thread: Thread-3, unlocked read
Thread: Thread-4, unlocked read
上述結果是在”非公平鎖“的環境下得到的,無論嘗試運行多少次,2條讀線程都是被放在3條寫線程執行完畢后才開始執行,為了一次性驗證所有結論,我們再換”公平鎖“重新執行一次,結果如下:
Thread: Thread-0, bought ticket-12
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-11
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-10
Thread: Thread-2, unlocked write
Thread: Thread-3, tickets left: 9
Thread: Thread-4, tickets left: 9
Thread: Thread-4, unlocked read
Thread: Thread-3, unlocked read
Thread: Thread-0, bought ticket-9
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-8
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-7
Thread: Thread-2, unlocked write
Thread: Thread-4, tickets left: 6
Thread: Thread-3, tickets left: 6
Thread: Thread-3, unlocked read
Thread: Thread-4, unlocked read
Thread: Thread-0, bought ticket-6
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-5
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-4
Thread: Thread-2, unlocked write
Thread: Thread-0, bought ticket-3
Thread: Thread-0, unlocked write
Thread: Thread-1, bought ticket-2
Thread: Thread-1, unlocked write
Thread: Thread-2, bought ticket-1
Thread: Thread-2, unlocked write
這次讀線程就被穿插到寫線程中間了,從上述結果中可以看到:
當任意線程寫的時候,其他線程既不能讀也不能寫
Thread-3讀的時候,Thread-4同樣可以讀,但是不能有任何寫線程
3條寫線程永遠按照”0-1-2“的順序執行,他們遵守”公平性“原則
2條讀線程之間非互斥,所以也談不上什么”公平性”原則
3條寫線程”Thread-0、1、2“各獲得過一次鎖之后,必定輪到2條讀線程”Thread-3、4“獲得鎖,而不是如”非公平鎖“的結果那樣,讀線程總是等到寫線程全部執行結束后才開始執行,也就是說讀線程與寫線程之間遵守同一個”公平性“原則
使用場景分析
synchronized
不需要“中斷”與“公平鎖”的業務場景
較為簡單的“等待與喚醒”業務(與Object監視器方法結合使用)
ReentrantLock可重入鎖
需要“響應中斷”的業務場景:處于等待狀態的線程可以中斷
需要“公平鎖”的業務場景:線程有序獲得鎖,亦即“有序執行”
與Condition結合,可以滿足更為復雜的“等待與喚醒”業務(可以指定哪個線程被喚醒)
ReentrantReadWriteLock可重入讀寫鎖
允許“讀讀并發”的業務場景,可以大幅提高吞吐量
總結
synchronized實例方法
鎖定實例對象(this)
每個實例都有獨立的對象鎖,因此只有針對同一個實例,才具備互斥性
同一個實例中的多個synchronized實例方法之間,也是互斥的
synchronized靜態方法
鎖定類對象(class)
同步靜態方法在任意實例對象之間,也是互斥的
同個類的同步靜態方法和同步實例方法之間,不具備互斥性
synchronized代碼塊
鎖住指定的對象(可以是任意實例對象,類對象)
需要創建對象鎖時,建議使用 new byte[0] ,因為在所有對象中,它的創建是最經濟的
必須時刻明確對象鎖是誰,只有配合正確的使用方法,才能得到正確的同步效果
ReentrantLock可重入鎖
ReentrantLock是Lock接口的一種實現
需要手動加解鎖,操作復雜,但更加靈活
lock與unlock需要成對使用,且unlock應該放在 finally 中
可以響應中斷
可以實現“公平鎖”:先排隊等待(也就是等待時間越長)的線程先得到鎖
非公平鎖環境下,哪條線程獲得鎖并非是完全隨機的,已經進入等待隊列中的那些線程就仍然是根據FIFO原則獲得鎖的
非公平鎖效率高于公平鎖
ReentrantLock與Condition結合使用,類似synchronized與Object監視器方法結合使用
在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()
Condition的強大之處,在于它可以針對同一個lock對象,創建多個不同的Condition條件,以處理復雜的線程等待與喚醒場景
Condition可以指定哪條線程被喚醒,而notify/notifyAll則不行
ReentrantReadWriteLock可重入讀寫鎖
ReentrantReadWriteLock是ReadWriteLock接口(讀寫鎖)的一個實現類,而ReadWriteLock內部則是由Lock實現的
ReentrantReadWriteLock具有ReentrantLock的一切特性,同時還具有自己的獨立特性:"讀讀并發"、"寫寫互斥"、"讀寫互斥"
ReentrantReadWriteLock可以有效提高并發,增加吞吐量
在“公平鎖”環境下,讀線程之間沒有”公平性“可言,而寫線程之間,以及讀線程與寫線程之間,則遵守同一個“公平性”原則
總結
以上是生活随笔為你收集整理的java线程同步的实现_Java并发编程(三) - 实战:线程同步的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java图片预览上传_Java实现图片上
- 下一篇: TCP文件上传Java_java 基于T