聊聊高并发(六)实现几种自旋锁
這一篇我們通過兩種實現自旋鎖的方式來看一下不同的編程方式帶來的程序性能的變化。
?
先理解一下什么是自旋,所謂自旋就是線程在不滿足某種條件的情況下,一直循環做某個動作。所以對于自旋鎖來鎖,當線程在沒有獲取鎖的情況下,一直循環嘗試獲取鎖,直到真正獲取鎖。
?
在聊聊高并發(三)鎖的一些基本概念?我們提到鎖的本質就是等待,那么如何等待呢,有兩種方式
1. 線程阻塞
2. 線程自旋
?
阻塞的缺點顯而易見,線程一旦進入阻塞(Block),再被喚醒的代價比較高,性能較差。自旋的優點是線程還是Runnable的,只是在執行空代碼。當然一直自旋也會白白消耗計算資源,所以常見的做法是先自旋一段時間,還沒拿到鎖就進入阻塞。JVM在處理synchrized實現時就是采用了這種折中的方案,并提供了調節自旋的參數。
?
這篇說一下兩種最基本的自旋鎖實現,并提供了一種優化的鎖,后續會有更多的自旋鎖的實現。
?
首先是TASLock (Test And Set Lock),測試-設置鎖,它的特點是自旋時,每次嘗試獲取鎖時,采用了CAS操作,不斷的設置鎖標志位,當鎖標志位可用時,一個線程拿到鎖,其他線程繼續自旋。
缺點是CAS操作一直在修改共享變量的值,會引發緩存一致性流量風暴
?
?package com.test.lock;
// 鎖接口
public interface Lock {
?? ?public void lock();
?? ?
?? ?public void unlock();
}
package com.test.lock;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 測試-設置自旋鎖,使用AtomicBoolean原子變量保存狀態
* 每次都使用getAndSet原子操作來判斷鎖狀態并嘗試獲取鎖
* 缺點是getAndSet底層使用CAS來實現,一直在修改共享變量的值,會引發緩存一致性流量風暴
* **/
public class TASLock implements Lock{
private AtomicBoolean mutex = new AtomicBoolean(false);
@Override
public void lock() {
// getAndSet方法會設置mutex變量為true,并返回mutex之前的值
// 當mutex之前是false時才返回,表示獲取鎖
// getAndSet方法是原子操作,mutex原子變量的改動對所有線程可見
while(mutex.getAndSet(true)){
}
}
@Override
public void unlock() {
mutex.set(false);
}
public String toString(){
return "TASLock";
}
}
一種改進的算法是TTASLock(Test Test And Set Lock)測試-測試-設置鎖,特點是在自旋嘗試獲取鎖時,分為兩步,第一步通過讀操作來獲取鎖狀態,當鎖可獲取時,第二步再通過CAS操作來嘗試獲取鎖,減少了CAS的操作次數。并且第一步的讀操作是處理器直接讀取自身高速緩存,不會產生緩存一致性流量,不占用總線資源。
?
缺點是在鎖高爭用的情況下,線程很難一次就獲取鎖,CAS的操作會大大增加。
?
?package com.test.lock;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 測試-測試-設置自旋鎖,使用AtomicBoolean原子變量保存狀態
* 分為兩步來獲取鎖
* 1. 先采用讀變量自旋的方式嘗試獲取鎖
* 2. 當有可能獲取鎖時,再使用getAndSet原子操作來嘗試獲取鎖
* 優點是第一步使用讀變量的方式來獲取鎖,在處理器內部高速緩存操作,不會產生緩存一致性流量
* 缺點是當鎖爭用激烈的時候,第一步一直獲取不到鎖,getAndSet底層使用CAS來實現,一直在修改共享變量的值,會引發緩存一致性流量風暴
* **/
public class TTASLock implements Lock{
private AtomicBoolean mutex = new AtomicBoolean(false);
@Override
public void lock() {
while(true){
// 第一步使用讀操作,嘗試獲取鎖,當mutex為false時退出循環,表示可以獲取鎖
while(mutex.get()){}
// 第二部使用getAndSet方法來嘗試獲取鎖
if(!mutex.getAndSet(true)){
return;
}
}
}
@Override
public void unlock() {
mutex.set(false);
}
public String toString(){
return "TTASLock";
}
}
?
針對鎖高爭用的問題,可以采取回退算法,即當線程沒有拿到鎖時,就等待一段時間再去嘗試獲取鎖,這樣可以減少鎖的爭用,提高程序的性能。
?
?
?package com.test.lock;
import java.util.Random;
/**
* 回退算法,降低鎖爭用的幾率
* **/
public class Backoff {
private final int minDelay, maxDelay;
private int limit;
final Random random;
public Backoff(int min, int max){
this.minDelay = min;
this.maxDelay = max;
limit = minDelay;
random = new Random();
}
// 回退,線程等待一段時間
public void backoff() throws InterruptedException{
int delay = random.nextInt(limit);
limit = Math.min(maxDelay, 2 * limit);
Thread.sleep(delay);
}
}
package com.test.lock;
import java.util.concurrent.atomic.AtomicBoolean;
/**
?* 回退自旋鎖,在測試-測試-設置自旋鎖的基礎上增加了線程回退,降低鎖的爭用
?* 優點是在鎖高爭用的情況下減少了鎖的爭用,提高了執行的性能
?* 缺點是回退的時間難以控制,需要不斷測試才能找到合適的值,而且依賴底層硬件的性能,擴展性差
?* **/
public class BackoffLock implements Lock{
?? ?private final int MIN_DELAY, MAX_DELAY;
?? ?
?? ?public BackoffLock(int min, int max){
?? ??? ?MIN_DELAY = min;
?? ??? ?MAX_DELAY = max;
?? ?}
?? ?
?? ?private AtomicBoolean mutex = new AtomicBoolean(false);
?? ?
?? ?@Override
?? ?public void lock() {
?? ??? ?// 增加回退對象
?? ??? ?Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
?? ??? ?while(true){
?? ??? ??? ?// 第一步使用讀操作,嘗試獲取鎖,當mutex為false時退出循環,表示可以獲取鎖
?? ??? ??? ?while(mutex.get()){}
?? ??? ??? ?// 第二部使用getAndSet方法來嘗試獲取鎖
?? ??? ??? ?if(!mutex.getAndSet(true)){
?? ??? ??? ??? ?return;
?? ??? ??? ?}else{
?? ??? ??? ??? ?//回退
?? ??? ??? ??? ?try {
?? ??? ??? ??? ??? ?backoff.backoff();
?? ??? ??? ??? ?} catch (InterruptedException e) {
?? ??? ??? ??? ?}
?? ??? ??? ?}?? ?
?? ??? ??? ?
?? ??? ?}
?? ?}
?? ?@Override
?? ?public void unlock() {
?? ??? ?mutex.set(false);
?? ?}
?? ?public String toString(){
?? ??? ?return "TTASLock";
?? ?}
}
?
回退自旋鎖的問題是回退的時間難以控制,需要不斷測試才能找到合適的值,而且依賴底層硬件的性能,擴展性差。后面會有更好的自旋鎖實現算法。
?
?
下面我們測試一下TASLock和TTASLock的性能。
首先寫一個計時的類
?
?package com.test.lock;
public class TimeCost implements Lock{
private final Lock lock;
public TimeCost(Lock lock){
this.lock = lock;
}
@Override
public void lock() {
long start = System.nanoTime();
lock.lock();
long duration = System.nanoTime() - start;
System.out.println(lock.toString() + " time cost is " + duration + " ns");
}
@Override
public void unlock() {
lock.unlock();
}
}
然后采用多個線程來模擬對同一把鎖的爭用
?
?
?
?package com.test.lock;
public class Main {
private static TimeCost timeCost = new TimeCost(new TASLock());
//private static TimeCost timeCost = new TimeCost(new TTASLock());
public static void method(){
timeCost.lock();
//int a = 10;
timeCost.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 100; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
測試機器的性能如下:
?
CPU: 4? Intel(R) Core(TM) i3-2120 CPU @ 3.30GHz
內存: 8G
?
測試結果:
50個線程情況下:
TASLock平均獲取鎖的時間: 339715 ns
?
TTASLock平均獲取鎖的時間:?67106.2 ns
?
?
100個線程情況下:
TASLock平均獲取鎖的時間: 1198413 ns
TTASLock平均獲取鎖的時間:?1273588 ns
?
可以看到TTASLock的性能比TASLock的性能更好
?
對TTASLock的一種改進是BackoffLock,它會在鎖高爭用的情況下對線程進行回退,減少競爭,減少緩存一致性流量。但是BackoffLock有三個主要的問題:
1. 還是有大量的緩存一致性流量,因為所有線程在同一個共享變量上旋轉,每一次成功的獲取鎖都會產生緩存一致性流量
2. 因為回退的存在,不能及時獲取鎖釋放的信息,存在一個時間差,導致獲取鎖的時間變長
3. 不能保證無饑餓,有的線程可能一直無法獲取鎖
?
這篇會實現2種基于隊列的鎖,來解決上面提到的三個問題。主要的思路是將線程組織成一個隊列,有4個優點:
1. 每個線程只需要檢查它的前驅線程的狀態,將自旋的變量從一個分散到多個,減少緩存一致性流量
2. 可以即使獲取鎖釋放的通知
3. 隊列提供了先來先服務的公平性
4. 無饑餓,隊列中的每個線程都能保證被執行到
?
隊列鎖分為兩類,一類是基于有界隊列,一類是基于無界隊列。
?
先看一下基于有界隊列的隊列鎖。 ArrayLock有3個特點:
1. 基于一個volatile數組來組織線程
2. 通過一個原子變量tail來表示對尾線程
3. 通過一個ThreadLocal變量給每個線程一個索引號,表示它位于隊列的哪個位置。
?
?package com.test.lock;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 有界隊列鎖,使用一個volatile數組來組織線程
* 缺點是得預先知道線程的規模n,所有線程獲取同一個鎖的次數不能超過n
* 假設L把鎖,那么鎖的空間復雜度為O(Ln)
* **/
public class ArrayLock implements Lock{
// 使用volatile數組來存放鎖標志, flags[i] = true表示可以獲得鎖
private volatile boolean[] flags;
// 指向新加入的節點的后一個位置
private AtomicInteger tail;
// 總容量
private final int capacity;
private ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer>(){
protected Integer initialValue() {
return 0;
}
};
public ArrayLock(int capacity){
this.capacity = capacity;
flags = new boolean[capacity];
tail = new AtomicInteger(0);
// 默認第一個位置可獲得鎖
flags[0] = true;
}
@Override
public void lock() {
int slot = tail.getAndIncrement() % capacity;
mySlotIndex.set(slot);
// flags[slot] == true 表示獲得了鎖, volatile變量保證鎖釋放及時通知
while(!flags[slot]){
}
}
@Override
public void unlock() {
int slot = mySlotIndex.get();
flags[slot] = false;
flags[(slot + 1) % capacity] = true;
}
<pre name="code" class="java">
public String toString(){
?? ??? ? return "ArrayLock";
?? ? }
?}
?
?
我們可以看到有界隊列鎖的缺點是:
1. 它必須知道線程的規模數,對于同一把鎖如果線程獲取的次數超過了n會出現線程狀態被覆蓋的問題
2. 空間復雜度是O(Ln)
3. 對于共享的volatile數組來保存線程獲取鎖的狀態,仍然可能存在緩存一致性。我們知道CPU讀取一次內存時,會讀滿數據總線的位長,比如64位總線,一次讀取64位長度的數據。那么對于boolean類型的數組,boolean長度是1個字節,那么一次讀取能讀到8個boolean變量,而高速緩存的一個緩存塊的長度也是64位,也就是說一個緩存塊上可以保存8個boolean變量,所以如果一次CAS操作修改了一個變量導致一個緩存塊無效,它實際上可能導致8個變量失效。
解決辦法是把變量以8個長度為單位分散,比如flag[0] = thread1? flag[8] = thread2。這樣的問題是消耗的空間更大。
?
無界隊列鎖可以克服有界隊列鎖的幾個問題。
1. 它使用鏈表來代替數組,實現無界隊列
2. 使用兩個ThreadLocal變量表示指針,一個指向自己的節點,一個指向前一個節點
3. 使用一個原子引用變量指向隊尾
4. 空間復雜度降低,如果有L把鎖,n個線程,每個線程只獲取一把鎖,那么空間復雜度為O(L + n)
5. 對同一個鎖,一個線程可以多次獲取而不增加空間復雜度
6. 當線程結束后,GC會自動回收內存
?
?package com.test.lock;
import java.util.concurrent.atomic.AtomicReference;
/**
* 無界隊列鎖,使用一個鏈表來組織線程
* 假設L把鎖,n個線程,那么鎖的空間復雜度為O(L+n)
* **/
public class CLHLock implements Lock{
// 原子變量指向隊尾
private AtomicReference<QNode> tail;
// 兩個指針,一個指向自己的Node,一個指向前一個Node
ThreadLocal<QNode> myNode;
ThreadLocal<QNode> myPreNode;
public CLHLock(){
tail = new AtomicReference<QNode>(new QNode());
myNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return new QNode();
}
};
myPreNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return null;
}
};
}
@Override
public void lock() {
QNode node = myNode.get();
node.lock = true;
// CAS原子操作,保證原子性
QNode preNode = tail.getAndSet(node);
myPreNode.set(preNode);
// volatile變量,能保證鎖釋放及時通知
// 只對前一個節點的狀態自旋,減少緩存一致性流量
while(preNode.lock){
}
}
@Override
public void unlock() {
QNode node = myNode.get();
node.lock = false;
// 把myNode指向preNode,目的是保證同一個線程下次還能使用這個鎖,因為myNode原來指向的節點有它的后一個節點的preNode引用
// 防止這個線程下次lock時myNode.get獲得原來的節點
myNode.set(myPreNode.get());
}
public static class QNode {
volatile boolean lock;
}
public String toString(){
?? ??? ? return "CLHLock";
?? ? }
?}
?
下面我們從正確性和平均獲取鎖的時間上來測試這兩種鎖。
我們設計一個測試用例來驗證正確性: 使用50個線程對一個volatile變量++操作,由于volatile變量++操作不是原子的,在不加鎖的情況下,可能同時有多個線程同時對voaltile變量++, 最終的結果是無法預測的。然后使用這兩種鎖,先獲取鎖再volatile變量++,由于volatile變量會防止重排序,并能保證可見性,我們可以確定如果鎖是正確獲取的,也就是說同一時刻只有一個線程對volatile變量++,那么結果肯定是順序的1到50。
先看不加鎖的情況
?
?package com.test.lock;
public class Main {
//private static Lock lock = new ArrayLock(150);
private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
//lock.lock();
System.out.println("Value: " + ++value);
//lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
運行結果: 我們可以看到確實是發生的線程同時對volatile變量++的操作,結果是無法預料的
?
?Value: 1
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 13
Value: 12
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 37
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 50
使用有界隊列鎖:
?
?package com.test.lock;
public class Main {
private static Lock lock = new ArrayLock(100);
//private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
運行結果是1到50的順序自增,說明鎖保證了同一時刻只有一個線程在對volatile變量++,是正確的
?
?Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
Value: 50
使用無界隊列鎖的情況也是正確的,由于篇幅原因這里就不帖代碼了。
?
再看平均獲取鎖的時間。
?
?package com.test.lock;
public class Main {
private static Lock lock = new TimeCost(new CLHLock());
//private static Lock lock = new CLHLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
//System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 100; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
在100個線程并發的情況下,
ArrayLock獲取鎖的平均時間是: 719550 ns
CLHLock獲取鎖的平均時間是:? 488577 ns
?
可以看到,隊列鎖在使用多個共享變量自旋的情況下,減少了一致性流量,比TASLock和TTASLock 提高了程序的性能。而CLHLock比ArrayLock有更好的擴展性和性能,是一種很好的自旋鎖實現。
?
CLHLock是無饑餓的,保證先來先服務公平性,只有少量的緩存一致性流量,在SMP系統結構中,是一種比較完善的鎖。但是在沒有cache的NUMA系統架構中,由于在前一個節點的lock狀態上自旋,NUMA架構中處理器訪問本地內存的速度高于通過網絡訪問其他節點的內存,所以CLHLock在NUMA架構上不是最優的自旋鎖。
?
這篇介紹一種適合在無cache的NUMA系統架構中比較完善的隊列鎖MCSLock。它的特點是:
1. 使用1個ThreadLocal指針來做鏈表,由QNode自身維護下一個節點的指針
2. 線程在自身節點自旋,而不是CLHLock那樣在前一個節點自旋
3.?在釋放鎖時需要判斷是否是唯一節點,需要做一次CAS操作,如果不是唯一節點,要稍微等待鏈表關系的建立
?
?
?package com.zc.lock;
import java.util.concurrent.atomic.AtomicReference;
/**
* 無界隊列鎖,使用一個鏈表來組織線程
* 假設L把鎖,n個線程,那么鎖的空間復雜度為O(L+n)
* **/
public class MCSLock implements Lock{
// 原子變量指向隊尾
private AtomicReference<QNode> tail;
// 兩個指針,一個指向自己的Node,一個指向前一個Node
ThreadLocal<QNode> myNode;
public MCSLock(){
tail = new AtomicReference<QNode>(null);
myNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return new QNode();
}
};
}
@Override
public void lock() {
QNode node = myNode.get();
// CAS原子操作,保證原子性
QNode preNode = tail.getAndSet(node);
// 如果preNode等于空,證明是第一個獲取鎖的
if(preNode != null){
node.lock = true;
preNode.next = node;
// 對線程自己的node進行自旋,對無cache的NUMA系統架構來說,訪問本地內存速度優于其他節點的內存
while(node.lock){
}
}
}
@Override
public void unlock() {
QNode node = myNode.get();
if(node.next == null){
// CAS操作,判斷是否沒有新加入的節點
if(tail.compareAndSet(node, null)){
// 沒有新加入的節點,直接返回
return;
}
// 有新加入的節點,等待設置鏈關系
while(node.next == null){
}
}
// 通知下一個節點獲取鎖
node.next.lock = false;
// 設置next節點為空,為下次獲取鎖清理狀態
node.next = null;
}
public static class QNode {
volatile boolean lock;
volatile QNode next;
}
public String toString(){
return "MCSLock";
}
}
下面采用和上一篇同樣的測試用例來測試MCSLock的正確性
?
?
?package com.zc.lock;
public class Main {
//private static Lock lock = new TimeCost(new ArrayLock(150));
private static Lock lock = new MCSLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
測試結果:順序地打印出volatile變量++的結果,證明同一時刻只有一個線程在做volatile++操作,證明加鎖成功。
?
?
?Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
Value: 50
?
java并發包中的Lock定義包含了時限鎖的接口:
?
?public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
?boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
tryLock就是實現鎖的接口,它支持限時操作,支持中斷操作。這兩個特性很重要,可以防止死鎖,也可以在死鎖的情況下取消鎖。
?
因為這兩個特性的需要,隊列鎖的節點需要支持“退出隊列”的機制,也就是說當發生超時或者線程中斷的情況下,線程能從隊列中出隊列,不影響其他節點繼續等待。之前實現的幾種隊列鎖都不支持退出機制,一旦發生隊列中的線程長時間阻塞,那么后續所有的線程都會被動阻塞。
?
我們看一種限時隊列鎖的實現,它有幾個要點:
1. 定義一個共享的AVAILABLE節點,當一個節點的preNode指向AVAILABLE時,表示這個節點獲得鎖
2. QNode節點維護一個preNode引用,這個引用只有當獲得鎖時,會指向AVAILABLE,或者超時了會指向它的前一個節點,其他等待鎖的時候都是Null,因為一旦一個節點超時了,需要讓它的后續節點指向它的前驅節點,所以只有超時的時候會給preNode設置值(指向AVAILABLE節點除外)。
3. 使用一個AtomicReference原子變量tail來形成一個虛擬的單向鏈表結構。tail的getAndSet操作會返回之前的節點的引用,相當于獲得了前驅節點。當獲得鎖后,前驅節點引用就釋放了,前驅節點就可以被GC回收
4. 支持中斷操作,Thread.isInterrupted()可以獲得線程中斷的信息,一旦獲取中斷信息,就拋出中斷異常。需要注意的時,線程中斷信息發出時,并不是要求線程馬上中斷,而是告知了線程要中斷的信息,程序自己控制中斷的地點。
5. 由于線程只有一個ThreadLocal的myNode變量指向自己的節點,所以獲取鎖時,使用了每次new一個新的Node,并設置給線程的方式,避免unlock時對node的操作影響后續節點的狀態,也可以使線程多次獲得鎖。這里可以考慮像CLHLock那樣,維護兩個ThreadLocal的引用,釋放鎖時把myNode的引用指向已經不使用的前驅節點,這樣避免無謂的new操作。
?
?
?package com.zc.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* 時限隊列鎖,支持tryLock超時操作
* QNode維護一個指針preNode指向前一個節點。當preNode == AVAILABLE表示已經釋放鎖。當preNode == null表示等待鎖
* tail維護一個虛擬鏈表,通過tail.getAndSet方法獲得前一個節點,并在前一個節點自旋,當釋放鎖時前一個節點的preNode == AVAIABLE,自動通知后一個節點獲取鎖
* 當一個節點超時或者被中斷,那么它的前驅節點不為空。后續節點看到它的前驅節點不為空,并且不是AVAILABLE時,知道這個節點退出了,就會跳過它
* 當節點獲得鎖,進入臨界區后,它的前驅節點可以被回收
* **/
public class TimeoutLock implements TryLock{
// 聲明為靜態變量,防止被臨時回收
private static final QNode AVAILABLE = new QNode();
// 原子變量指向隊尾
private AtomicReference<QNode> tail;
ThreadLocal<QNode> myNode;
public TimeoutLock(){
tail = new AtomicReference<QNode>(null);
myNode = new ThreadLocal<QNode>(){
protected QNode initialValue(){
return new QNode();
}
};
}
@Override
public void lock() {
// 和CLHLock不同,每次新建一個Node,并設置給線程,目的是支持同一個線程可以多次獲得鎖,而不影響鏈中其他節點的狀態
// CLHLock不需要每次新建Node是因為它使用了兩個指針,一個指向前驅節點。而前驅節點釋放后就可以回收了。
// CLHLock每次釋放鎖時設置myNode為失效的前驅節點,也是為了支持同一個線程可以多次獲取鎖而不影響其他節點
QNode node = new QNode();
myNode.set(node);
QNode pre = tail.getAndSet(node);
if(pre != null){
// 在前一個節點自旋,當前一個節點是AVAILABLE時,表示它獲得鎖
while(pre.preNode != AVAILABLE){
}
}
}
@Override
public void unlock() {
QNode node = myNode.get();
// CAS操作,如果為true,表示是唯一節點,直接釋放就行;否則把preNode指向AVAILABLE
if(!tail.compareAndSet(node, null)){
node.preNode = AVAILABLE;
}
}
@Override
//TimeUnit只支持毫秒
public boolean trylock(long time, TimeUnit unit) throws InterruptedException {
if(Thread.interrupted()){
throw new InterruptedException();
}
boolean isInterrupted = false;
long startTime = System.currentTimeMillis();
long duration = TimeUnit.MILLISECONDS.convert(time, unit);
// 注意:每次tryLock都要new新的Node,為了同一個線程可以多次獲得鎖。如果每個線程都使用同一個節點,會影響鏈中其他的節點
QNode node = new QNode();
myNode.set(node);
// 嘗試一次獲取鎖
QNode pre = tail.getAndSet(node);
// 第一個節點或者之前的節點都是已經釋放了鎖的節點, pre==AVAILABLE表示獲得了鎖
if(pre == null || pre == AVAILABLE){
return true;
}
// 在給定時間內對preNode自旋
while((System.currentTimeMillis() - startTime < duration) && !isInterrupted){
QNode predPreNode = pre.preNode;
// 表示前一個節點已經釋放了鎖,設置了preNode域,否則preNode域為空
if(predPreNode == AVAILABLE){
return true;
}
// 當prePreNode != null時,只有兩種情況,就是它超時了,或者被中斷了。
// 跳過prePreNode不為空的節點,繼續自旋它的下一個節點
else if(predPreNode != null){
pre = predPreNode;
}
if(Thread.interrupted()){
isInterrupted = true;
}
}
// 超時或者interrupted,都要設置node的前驅節點不為空
node.preNode = pre;
if(isInterrupted){
throw new InterruptedException();
}
return false;
}
public static class QNode {
volatile QNode preNode;
}
public String toString(){
return "TimeoutLock";
}
}
TimeoutLock具備所有CLHLock的特性,比如無饑餓,先來先服務的公平性,在多個共享變量上自旋,從而控制合理的緩存一致性流量等等,并且支持了限時操作和中斷操作。
使用限時鎖時有固定的模板,防止鎖被錯誤使用。
?
?
?Lock lock = ...;
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
?
?
這篇說說限時有界隊列鎖,它采用了有界隊列,并且和ArrayLock不同,它不限制線程的個數。它的特點主要有
1. 采用有界隊列,減小了空間復雜度,L把鎖的空間復雜度在最壞的情況下(有界隊列長度為1)是O(L)
2.?非公平,不保證先來先服務,這也是一個很常見的需求
3. 因為是有界隊列,所以在高并發下存在高爭用,需要結合回退鎖來降低爭用
?
它的實現思路是:
1. 采用了一個有界的等待隊列,等待隊列的每個節點都有多種狀態,每個節點是可復用的
2. 采用了一個工作隊列,Tail指針指向工作隊列的隊尾節點。獲取和是否鎖的操作是在工作隊列中的節點之間進行
3. 由于是限時隊列,并支持中斷,所以隊列中的節點都是可以退出隊列的
4. 算法分為三步,第一步是線程從有界的等待隊列中獲得一個節點,并設置為WAITING,如果沒有獲得,就自旋
??? 第二步是把這個節點加入工作隊列,并獲得前一個節點的指針
??? 第三步是在前一個節點的狀態上自旋,直到獲得鎖,并把前一個節點RELEASED狀態改為FREE
?
節點有4種狀態:
1. FREE:? 表示節點可以被獲得。當前一個節點釋放鎖,并設置狀態為RELEASED的時候,后一個節點需要把前一個節點設置為FREE。當節點在沒有進入工作隊列時超時,也被設置為FREE.
2. RELEASED:節點釋放鎖時設置為RELEASED,需要后續節點把它設置為FREE。如果是工作隊列的最后一個節點,那么RELEASED狀態的節點在第一步時可被獲得
3. WAITING:表示獲得了鎖或在工作隊列中等待鎖。是在第一步中被設置的,第一步的結果就是獲得一個狀態為WAITING的節點
4. ABORTED:工作隊列中的節點超時或者中斷的節點被設置為ABORTED。 隊尾的ABORTED節點可以被第一步獲得,隊中的ABORTED節點不能被第一步獲取,只能把它的preNode指針指向它的前一個節點,表示它自己不能被獲取了
?
理解節點這4種狀態的轉變是理解這個設計的關鍵。這個設計比較復雜,從篇幅考慮,這篇只介紹Lock和UnLock操作,下一篇說tryLock限時操作
1. 創建枚舉類型State來表示狀態
2. 創建QNode表示節點,使用一個AtomicReference原子變量指向它的State,以便于支持CAS操作。節點維護一個PreNode引用,只有節點被Aborted的時候才設置這個引用的值,表示跳過這個節點
3. 一個有界的QNode隊列,使用數組表示
4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,單位是毫秒。這兩個值依賴于硬件性能,需要通過不斷測試來獲取最優值
5. 一個Random隨機數,來產生隨即的數組下標,非公平性需要
6.?一個AtomicStampedReference類型的原子變量作為隊尾指針tail。AtomicStampedReference采用了版本號來避免CAS操作的ABA問題。這很重要,因為有界等待隊列的節點會多次進出工作隊列,所以可能發生同一個節點被前一個線程準備CAS操作時,已經被后幾個線程進出了工作隊列,導致第一個線程拿到的QNode的狀態不正確。
7. lock實現分為三步,上文已經說過了
8. unlock操作就是兩步,第一修改狀態通知其他線程獲取鎖。第二是設置自己的節點引用,以便下次可再次獲得鎖而不影響其他線程的狀態。這里是把線程指向的節點狀態設置為RELEASED,同時設置線程的節點引用為空,這樣其他線程可以繼續使用這個節點。
?
?
?package com.zc.lock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 限時有界隊列鎖,并且直接不限數量的線程
* 由于是有界的隊列,所以爭用激烈,可以復合回退鎖的概念,減少高爭用
* 分為三步:
* 第一步是取得一個State為FREE的節點,設置為WAITING
* 第二步是把這個節點加入隊列,獲取前一個節點
* 第三步是在前一個節點上自旋
*
* 優點是L個鎖的空間復雜度是O(L),而限時無界隊列鎖的空間復雜度為O(Ln)
* **/
public class CompositeLock implements TryLock{
enum State {FREE, WAITING, RELEASED, ABORTED}
class QNode{
AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
volatile QNode preNode;
}
private final int SIZE = 10;
private final int MIN_BACKOFF = 1;
private final int MAX_BACKOFF = 10;
private Random random = new Random();
// 有界的QNode數組,表示隊列總共可以使用的節點數
private QNode[] waitings = new QNode[10];
// 指向隊尾節點,使用AtomicStampedReference帶版本號的原子引用變量,可以防止ABA問題,因為這個算法實現需要對同一個Node多次進出隊列
private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);
// 每個線程維護一個QNode引用
private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
public QNode initialValue(){
return null;
}
};
public CompositeLock(){
for(int i = 0; i < SIZE; i ++){
waitings[i] = new QNode();
}
}
@Override
public void lock() {
Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
QNode node = waitings[random.nextInt(SIZE)];
// 第一步: 先獲得數組里的一個Node,并把它的狀態設置為WAITING,否則就自旋
GETNODE:
while(true){
while(node.state.get() != State.FREE){
// 因為釋放鎖時只是設置了State為RELEASED,由后繼的線程來設置RELEASED為FREE
// 如果該節點已經是隊尾節點了并且是RELEASED,那么可以直接可以被使用
// 獲取當前原子引用變量的版本號
int[] currentStamp = new int[1];
QNode tailNode = tail.get(currentStamp);
if(tailNode == node && tailNode.state.get() == State.RELEASED){
if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
node.state.set(State.WAITING);
break GETNODE;
}
}
}
if(node.state.compareAndSet(State.FREE, State.WAITING)){
break;
}
try {
backoff.backoff();
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted, stop to get the lock");
}
}
// 第二步加入隊列
int[] currentStamp = new int[1];
QNode preTailNode = null;
do{
preTailNode = tail.get(currentStamp);
}
// 如果沒加入隊列,就一直自旋
while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));
// 第三步在前一個節點自旋,如果前一個節點為null,證明是第一個加入隊列的節點
if(preTailNode != null){
// 在前一個節點的狀態自旋
while(preTailNode.state.get() != State.RELEASED){}
// 設置前一個節點的狀態為FREE,可以被其他線程使用
preTailNode.state.set(State.FREE);
}
// 將線程的myNode指向獲得鎖的node
myNode.set(node);
return;
}
@Override
public void unlock() {
QNode node = myNode.get();
node.state.set(State.RELEASED);
myNode.set(null);
}
@Override
public boolean trylock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
采用我們之前的驗證鎖正確性的測試用例來測試lock, unlock操作。
?
?package com.zc.lock;
public class Main {
//private static Lock lock = new TimeCost(new ArrayLock(150));
private static Lock lock = new CompositeLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
結果是順序打印的,證明鎖是正確的,每次只有一個線程獲得了鎖
?
?Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的聊聊高并发(六)实现几种自旋锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(五)理解缓存一致性协议以及对
- 下一篇: 聊聊高并发(十七)解析java.util