分布式锁—3.Redisson的公平锁
大綱
1.Redisson公平鎖RedissonFairLock概述
2.公平鎖源碼之加鎖和排隊(duì)
3.公平鎖源碼之可重入加鎖
4.公平鎖源碼之新舊版本對(duì)比
5.公平鎖源碼之隊(duì)列重排
6.公平鎖源碼之釋放鎖
7.公平鎖源碼之按順序依次加鎖
1.Redisson公平鎖RedissonFairLock概述
(1)非公平和公平的可重入鎖
(2)Redisson公平鎖的簡(jiǎn)單使用
(3)Redisson公平鎖的初始化
(1)非公平和公平的可重入鎖
一.非公平可重入鎖
鎖被釋放后,排隊(duì)獲取鎖的線程會(huì)重新無序獲取鎖,沒有任何順序性可言。
二.公平可重入鎖
鎖被釋放后,排隊(duì)獲取鎖的線程會(huì)按照請(qǐng)求獲取鎖時(shí)候的順序去獲取鎖。公平鎖可以保證線程獲取鎖的順序,與其請(qǐng)求獲取鎖的順序是一樣的。也就是誰先申請(qǐng)獲取到這把鎖,誰就可以先獲取到這把鎖。公平可重入鎖會(huì)把各個(gè)線程的加鎖請(qǐng)求進(jìn)行排隊(duì)處理,保證先申請(qǐng)獲取鎖的線程,可以優(yōu)先獲取鎖,從而實(shí)現(xiàn)所謂的公平性。
三.可重入的非公平鎖和公平鎖不同點(diǎn)
可重入的非公平鎖和公平鎖,在整體的技術(shù)實(shí)現(xiàn)框架上都是一樣的。唯一的不同點(diǎn)就是加鎖和解鎖的邏輯不一樣。非公平鎖的加鎖邏輯,比較簡(jiǎn)單。公平鎖的加鎖邏輯,要加入排隊(duì)機(jī)制,保證各個(gè)線程排隊(duì)能按順序獲取鎖。
(2)Redisson公平鎖的簡(jiǎn)單使用
Redisson的可重入鎖RedissonLock指的是非公平可重入鎖,Redisson的公平鎖RedissonFairLock指的是公平可重入鎖。
Redisson的公平可重入鎖實(shí)現(xiàn)了java.util.concurrent.locks.Lock接口,保證了當(dāng)多個(gè)線程同時(shí)請(qǐng)求加鎖時(shí),優(yōu)先分配給先發(fā)出請(qǐng)求的線程。所有請(qǐng)求線程會(huì)在一個(gè)隊(duì)列中排隊(duì),當(dāng)某個(gè)線程出現(xiàn)宕機(jī)時(shí),Redisson會(huì)等待5秒之后才會(huì)繼續(xù)分配下一個(gè)線程。
RedissonFairLock是RedissonLock的子類。RedissonFairLock的鎖實(shí)現(xiàn)框架,和RedissonLock基本一樣。而在獲取鎖和釋放鎖的lua腳本中,RedissonFairLock的邏輯才有所區(qū)別。
//1.最常見的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();
//2.10秒鐘以后自動(dòng)解鎖,無需調(diào)用unlock方法手動(dòng)解鎖
fairLock.lock(10, TimeUnit.SECONDS);
//3.嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
//4.Redisson為公平的可重入鎖提供了異步執(zhí)行的相關(guān)方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
(3)Redisson公平鎖的初始化
public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config);
//獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//加鎖
fairLock.unlock();//釋放鎖
}
}
public class Redisson implements RedissonClient {
//Redis的連接管理器,封裝了一個(gè)Config實(shí)例
protected final ConnectionManager connectionManager;
//Redis的命令執(zhí)行器,封裝了一個(gè)ConnectionManager實(shí)例
protected final CommandAsyncExecutor commandExecutor;
...
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
//初始化Redis的連接管理器
connectionManager = ConfigSupport.createConnectionManager(configCopy);
...
//初始化Redis的命令執(zhí)行器
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
...
}
public RLock getFairLock(String name) {
return new RedissonFairLock(commandExecutor, name);
}
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
...
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
...
}
...
}
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
final CommandAsyncExecutor commandExecutor;
...
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
//與WatchDog有關(guān)的internalLockLeaseTime
//通過命令執(zhí)行器CommandExecutor可以獲取連接管理器ConnectionManager
//通過連接管理器ConnectionManager可以獲取Redis的配置信息類Config
//通過Redis的配置信息類Config可以獲取lockWatchdogTimeout超時(shí)時(shí)間
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
...
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();//獲取UUID
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
...
}
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
}
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
}
public class ConfigSupport {
...
//創(chuàng)建Redis的連接管理器
public static ConnectionManager createConnectionManager(Config configCopy) {
//生成UUID
UUID id = UUID.randomUUID();
...
if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
//返回ClusterConnectionManager實(shí)例
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
}
...
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//初始化時(shí)為UUID
private final Config cfg;
protected Codec codec;
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();//傳入的是UUID
...
this.cfg = cfg;
this.codec = cfg.getCodec();
...
}
public String getId() {
return id;
}
public Codec getCodec() {
return codec;
}
...
}
2.公平鎖源碼之加鎖和排隊(duì)
(1)加鎖時(shí)的執(zhí)行流程
(2)獲取公平鎖的lua腳本相關(guān)參數(shù)說明
(3)lua腳本步驟一:進(jìn)入while循環(huán)移除隊(duì)列和有序集合中等待超時(shí)的線程
(4)lua腳本步驟二:判斷當(dāng)前線程能否獲取鎖
(5)lua腳本步驟三:執(zhí)行獲取鎖的操作
(6)lua腳本步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖)
(7)lua腳本步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
(8)lua腳本步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)
(9)獲取鎖失敗的第一個(gè)線程執(zhí)行l(wèi)ua腳本的流程
(10)獲取鎖失敗的第二個(gè)線程執(zhí)行l(wèi)ua腳本的流程
(1)加鎖時(shí)的執(zhí)行流程
使用Redisson的公平鎖RedissonFairLock進(jìn)行加鎖時(shí):首先調(diào)用的是RedissonLock的lock()方法,然后會(huì)調(diào)用RedissonLock的tryAcquire()方法,接著會(huì)調(diào)用RedissonLock的tryAcquireAsync()方法。
在RedissonLock的tryAcquireAsync()方法中,會(huì)調(diào)用一個(gè)可以被RedissonLock子類重載的tryLockInnerAsync()方法。對(duì)于非公平鎖,執(zhí)行到這會(huì)調(diào)用RedissonLock的tryLockInnerAsync()方法。對(duì)于公平鎖,執(zhí)行到這會(huì)調(diào)用RedissonFairLock的tryLockInnerAsync()方法。
在RedissonFairLock的tryLockInnerAsync()方法中,便執(zhí)行具體的lua腳本。
public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config);
//獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//加鎖
fairLock.unlock();//釋放鎖
}
}
public class RedissonLock extends RedissonBaseLock {
...
//不帶參數(shù)的加鎖
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
//帶參數(shù)的加鎖
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//加鎖成功
if (ttl == null) {
return;
}
//加鎖失敗
...
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//非公平鎖,接下來調(diào)用的是RedissonLock.tryLockInnerAsync()方法
//公平鎖,接下來調(diào)用的是RedissonFairLock.tryLockInnerAsync()方法
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//對(duì)RFuture<Long>類型的ttlRemainingFuture添加回調(diào)監(jiān)聽
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
//tryLockInnerAsync()里的加鎖lua腳本異步執(zhí)行完畢,會(huì)回調(diào)如下方法邏輯:
//加鎖成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
//如果傳入的leaseTime不是-1,也就是指定鎖的過期時(shí)間,那么就不創(chuàng)建定時(shí)調(diào)度任務(wù)
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//創(chuàng)建定時(shí)調(diào)度任務(wù)
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;//線程可以等待鎖的時(shí)間
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);//傳入60秒*5=5分鐘
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
...
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
long wait = threadWaitTime;
if (waitTime != -1) {
//將傳入的指定的獲取鎖等待時(shí)間賦值給wait變量
wait = unit.toMillis(waitTime);
}
...
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//the lock cannot be acquired, check if the thread is already in the queue
//步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
//add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
//the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime
//步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID+線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime),
getLockName(threadId),
wait,
currentTime
);
}
...
}
...
}
(2)獲取公平鎖的lua腳本相關(guān)參數(shù)說明
KEYS[1]是getRawName(),它是一個(gè)Hash數(shù)據(jù)結(jié)構(gòu)的key,也就是鎖的名字,比如"myLock"。
KEYS[2]是threadsQueueName,它是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字,多個(gè)客戶端線程申請(qǐng)獲取鎖時(shí),會(huì)到這個(gè)隊(duì)列里進(jìn)行排隊(duì)。比如"redisson_lock_queue:{myLock}"。
KEYS[3]是timeoutSetName,它是一個(gè)用來對(duì)線程排序的有序集合的名字,這個(gè)有序集合可以自動(dòng)按照每個(gè)數(shù)據(jù)指定的分?jǐn)?shù)進(jìn)行排序。比如"redisson_lock_timeout:{myLock}"。
ARGV[1]是leaseTime,代表鎖的過期時(shí)間。如果leaseTime沒有指定,默認(rèn)就是internalLockLeaseTime = 30秒。
ARGV[2]是getLockName(threadId),代表客戶端UUID + 線程ID。
ARGV[3]是threadWaitTime,代表線程可以等待的時(shí)間(默認(rèn)5分鐘)。
ARGV[4]是currentTime,代表當(dāng)前時(shí)間。
(3)lua腳本步驟一:進(jìn)入while循環(huán)移除隊(duì)列和有序集合中等待超時(shí)的線程
while循環(huán)中首先執(zhí)行命令:"lindex redisson_lock_queue:{myLock} 0",也就是獲取"redisson_lock_queue:{myLock}"這個(gè)隊(duì)列中的第一個(gè)元素。一開始該隊(duì)列是空的,所以什么都獲取不到,firstThreadId2為false。此時(shí)就會(huì)break掉,退出while循環(huán)。
如果獲取到隊(duì)列中的第一個(gè)元素,那么就會(huì)執(zhí)行zscore命令:從有序集合中獲取該元素對(duì)應(yīng)的分?jǐn)?shù),也就是該元素對(duì)應(yīng)線程的過期時(shí)間。如果過期時(shí)間比當(dāng)前時(shí)間小,那么就要從隊(duì)列和有序集合中移除該元素。否則,也會(huì)break掉,退出while循環(huán)。
//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待鎖超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//從有序集合+隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
(4)lua腳本步驟二:判斷當(dāng)前線程能否獲取鎖
判斷條件一:
首先執(zhí)行命令"exists myLock",判斷鎖是否存在。一開始沒有線程加過鎖,所以判斷條件肯定是成立的,該條件為true。
判斷條件二:
接著執(zhí)行命令"exists redisson_lock_queue:{myLock}",看隊(duì)列是否存在。一開始也沒有這個(gè)隊(duì)列,所以這個(gè)條件也肯定成立,該條件為true。
判斷條件三:
如果有這個(gè)隊(duì)列,則判斷隊(duì)列存在的條件不成立,執(zhí)行"或"后面的判斷。也就是執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",判斷隊(duì)列的第一個(gè)元素是否是當(dāng)前線程的UUID + ThreadID。
//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;" +
總結(jié)當(dāng)前線程現(xiàn)在可以嘗試獲取鎖的情況如下:
情況一:鎖不存在 + 隊(duì)列也不存在
情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程
(5)lua腳本步驟三:執(zhí)行獲取鎖的操作
當(dāng)判斷現(xiàn)在能否嘗試獲取鎖的條件通過后,便會(huì)執(zhí)行如下操作:
步驟一:執(zhí)行命令"lpop redisson_lock_queue:{myLock}",彈出隊(duì)列第一個(gè)元素。一開始該隊(duì)列是空的,所以該命令不會(huì)進(jìn)行處理。接著執(zhí)行命令"zrem redisson_lock_timeout:{myLock} UUID1:ThreadID1",也就是從有序集合中刪除UUID1:ThreadID1對(duì)應(yīng)的元素。一開始該有序集合也是空的,所以該命令不會(huì)進(jìn)行處理。
步驟二:執(zhí)行命令"hset myLock UUID1:ThreadID1 1",進(jìn)行加鎖操作。在設(shè)置key為myLock的Hash值中,field為UUID1:ThreadID1的value值為1。接著執(zhí)行命令"pexpire myLock 30000",設(shè)置鎖key的過期時(shí)間為30秒。
最后返回nil,這樣在外層代碼中,就會(huì)認(rèn)為加鎖成功。于是就會(huì)創(chuàng)建一個(gè)WatchDog看門狗定時(shí)調(diào)度任務(wù),10秒后對(duì)鎖進(jìn)行檢查。如果檢查發(fā)現(xiàn)當(dāng)前線程還持有這個(gè)鎖,那么就重置鎖key的過期時(shí)間為30秒,并且重新創(chuàng)建一個(gè)WatchDog看門狗定時(shí)調(diào)度任務(wù)在10秒后繼續(xù)進(jìn)行檢查。
//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
(6)lua腳本步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖)
此時(shí)會(huì)執(zhí)行命令"hexists myLock UUID:ThreadID"。如果判斷條件通過,則說明是持有鎖的線程對(duì)鎖進(jìn)行了重入。于是會(huì)執(zhí)行命令"hincrby myLock UUID:ThreadID 1",對(duì)key為鎖名的Hash值中,field為UUID + 線程ID的value值累加1。并且執(zhí)行命令"pexpire myLock 300000"重置鎖key的過期時(shí)間。最后返回nil,表示重入加鎖成功。
//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
(7)lua腳本步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
通過執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID:ThreadID",獲取當(dāng)前線程在有序集合中的對(duì)應(yīng)的分?jǐn)?shù),也就是過期時(shí)間。如果獲取成功則返回:當(dāng)前線程等待獲取鎖的超時(shí)時(shí)間還剩多少,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間。
//the lock cannot be acquired, check if the thread is already in the queue
//步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
(8)lua腳本步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)
首先獲取隊(duì)列中的最后一個(gè)元素。因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的,所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間。從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的。然后獲取鎖或者隊(duì)列中排最后的線程剩余的存活時(shí)間,接著計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間。
然后把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置有序集合中該元素的分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間,接著再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部。
最后返回鎖或者隊(duì)列中排第一的線程剩余的存活時(shí)間ttl給外層代碼。如果外層代碼拿到的返回值是非null,那么客戶端會(huì)進(jìn)入一個(gè)while循環(huán)。在while循環(huán)會(huì)每阻塞等待ttl時(shí)間再嘗試去進(jìn)行加鎖,重新執(zhí)行l(wèi)ua腳本。
如果隊(duì)列里沒有元素,那么第一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待鎖的過期時(shí)間。如果隊(duì)列里有元素,那么后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間。
//步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
(9)獲取鎖失敗的第一個(gè)線程執(zhí)行l(wèi)ua腳本的流程
公平鎖的核心在于申請(qǐng)加鎖時(shí),加鎖失敗的各個(gè)客戶端會(huì)排隊(duì)。之后鎖被釋放時(shí),會(huì)依次獲取鎖,從而實(shí)現(xiàn)公平性。
假設(shè)此時(shí)第一個(gè)客戶端線程已加鎖成功,第二個(gè)客戶端線程也來嘗試加鎖,那么會(huì)進(jìn)行如下排隊(duì)處理。
步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。由于此時(shí)隊(duì)列還是空的,所以獲取到的是false,于是退出while循環(huán)。
步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖。因?yàn)閳?zhí)行命令"exists myLock",發(fā)現(xiàn)鎖已經(jīng)存在了,于是判斷不通過。
步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有,由于第二個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程。所以此時(shí)執(zhí)行命令"hexists myLock UUID2:ThreadID2",發(fā)現(xiàn)不存在。所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第一個(gè)獲取鎖失敗的線程,所以判斷不通過。
步驟五:接下來進(jìn)行排隊(duì)處理。
//對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID+線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
首先執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0"。也就是從隊(duì)列中獲取最后一個(gè)元素,由于此時(shí)隊(duì)列是空,所以獲取不到元素。然后執(zhí)行命令"ttl = pttl myLock",獲取鎖剩余的存活時(shí)間。
接著計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間。假設(shè)myLock剩余的存活時(shí)間ttl為20秒,那么timeout = ttl + 5分鐘 + 當(dāng)前時(shí)間 = 20秒 + 5分鐘 + 10:00:00 = 10:05:20;
然后執(zhí)行命令"zadd redisson_lock_timeout:{myLock} 10:05:20 UUID2:ThreadID2",這行命令的意思是,在有序集合中插入一個(gè)元素。元素值是UUID2:ThreadID2,元素對(duì)應(yīng)的分?jǐn)?shù)是10:05:20。分?jǐn)?shù)會(huì)用時(shí)間的Long型時(shí)間戳來表示,時(shí)間越靠后,時(shí)間戳就越大。有序集合Sorted Set會(huì)自動(dòng)根據(jù)插入的元素分?jǐn)?shù)從小到大進(jìn)行排序。
接著執(zhí)行命令"rpush redisson_lock_queue:{myLock} UUID2:TheadID2",這行命令的意思是,將UUID2:ThreadID2插入到隊(duì)列的尾部。
最后返回ttl給外層代碼,也就是返回myLock剩余的存活時(shí)間。如果外層代碼拿到的ttl是非null,那么客戶端會(huì)進(jìn)入一個(gè)while循環(huán)。在while循環(huán)會(huì)每阻塞等待ttl時(shí)間就嘗試進(jìn)行加鎖,重新執(zhí)行l(wèi)ua腳本。
(10)獲取鎖失敗的第二個(gè)線程執(zhí)行l(wèi)ua腳本的流程
如果此時(shí)有第三個(gè)客戶端線程也來嘗試加鎖,那么會(huì)進(jìn)行如下排隊(duì)處理。
步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。
繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),timeout = 10:05:20。
假設(shè)當(dāng)前時(shí)間是10:00:25,那么timeout <= 10:00:25的這個(gè)條件不成立,于是退出while循環(huán)。
步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。
步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于第三個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程。所以此時(shí)執(zhí)行命令"hexists myLock UUID3:ThreadID3",發(fā)現(xiàn)不存在。所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第二個(gè)獲取鎖失敗的線程,所以判斷不通過。
步驟五:接下來進(jìn)行排隊(duì)處理。
//對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"
首先執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取到隊(duì)列中的最后一個(gè)元素UUID2:ThreadID2。
然后判斷條件是否成立:lastThreadId不為false + lastThreadId不是自己。由于此時(shí)的ARGV[2] = UUID3:ThreadID3,所以判斷條件成立。即在隊(duì)列里排隊(duì)的最后一個(gè)元素并不是當(dāng)前嘗試獲取鎖的客戶端線程。
于是執(zhí)行:"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" - 當(dāng)前時(shí)間,也就是獲取在隊(duì)列中排最后的線程還有多少時(shí)間就會(huì)過期,從而得到ttl。
接著根據(jù)ttl計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間timeout,然后執(zhí)行zadd和rpush命令對(duì)當(dāng)前線程進(jìn)行入隊(duì)和排隊(duì),最后返回ttl。
3.公平鎖源碼之可重入加鎖
持有公平鎖的客戶端重復(fù)進(jìn)行l(wèi)ock.lock(),執(zhí)行加鎖lua腳本的流程如下:
步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。
繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),timeout = 10:05:20。
假設(shè)當(dāng)前時(shí)間是10:00:25,那么timeout <= 10:00:25的這個(gè)條件不成立,于是退出while循環(huán)。
步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。
步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于當(dāng)前線程的UUID + 線程ID等于持有鎖的線程。即此時(shí)執(zhí)行命令"hexists myLock UUID:ThreadID"發(fā)現(xiàn)key是存在的,所以此處的可重入鎖的判斷條件成立。
于是會(huì)執(zhí)行命令"hincrby myLock UUID:ThreadID 1",對(duì)key為鎖名的Hash值中,key為UUID + 線程ID的Hash值累加1。并且執(zhí)行命令"pexpire myLock 300000"重置鎖key的過期時(shí)間。最后返回nil,表示重入加鎖成功。
//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
4.公平鎖源碼之新舊版本對(duì)比
(1)新版本再次加鎖失敗不會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)
(2)舊版本再次加鎖失敗會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)
當(dāng)客戶端線程嘗試加公平鎖失敗處于排隊(duì)狀態(tài)時(shí),會(huì)進(jìn)入while循環(huán)。在while循環(huán)中,每次都會(huì)等待一段時(shí)間,再重新進(jìn)行嘗試加公平鎖。
public class RedissonLock extends RedissonBaseLock {
...
//加鎖
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//線程ID,用來生成設(shè)置Hash的值
long threadId = Thread.currentThread().getId();
//嘗試加鎖,此時(shí)執(zhí)行RedissonLock.lock()方法默認(rèn)傳入的leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//ttl為null說明加鎖成功
if (ttl == null) {
return;
}
//加鎖失敗時(shí)的處理
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
//再次嘗試獲取鎖
ttl = tryAcquire(-1, leaseTime, unit, threadId);
//返回的ttl為null,獲取到鎖,就退出while循環(huán)
if (ttl == null) {
break;
}
//返回的ttl不為null,則說明其他客戶端或線程還持有鎖
//那么就利用同步組件Semaphore進(jìn)行阻塞等待一段ttl的時(shí)間
if (ttl >= 0) {
try {
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
commandExecutor.getNow(future).getLatch().acquire();
} else {
commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(commandExecutor.getNow(future), threadId);
}
}
...
}
假設(shè)第二個(gè)客戶端線程第一次加鎖是在10:00:00,然后在10:00:15該客戶端線程再次發(fā)起請(qǐng)求嘗試進(jìn)行加鎖,但第一個(gè)客戶端線程在10:00:00~10:00:15之間一直持有這把鎖,此時(shí)第二個(gè)客戶端線程的再次加鎖流程如下:
(1)新版本再次加鎖失敗不會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)
步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。
繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),比如獲取到的timeout = 10:05:20。根據(jù)當(dāng)前時(shí)間是10:00:15,那么timeout <= 10:00:15的這個(gè)條件不成立,于是退出while循環(huán)。
步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。
步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于第二個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程,所以此時(shí)執(zhí)行命令"hexists myLock UUID2:ThreadID2",發(fā)現(xiàn)不存在,所以此處的可重入鎖的判斷條件也不成立。
步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第二次嘗試獲取鎖,所以判斷通過。然后返回第二個(gè)客戶端線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí),不會(huì)刷新排隊(duì)分?jǐn)?shù)。
//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖),KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
//步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
//步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime),
getLockName(threadId),
wait,//默認(rèn)是5分鐘
currentTime
);
}
(2)舊版本再次加鎖失敗會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)
舊版本公平鎖的lua腳本如下所示,當(dāng)?shù)诙€(gè)客戶端線程再次加鎖時(shí)會(huì)再次進(jìn)入排隊(duì)邏輯。
首先會(huì)出計(jì)算隊(duì)列中的第一個(gè)元素還有多少時(shí)間就超時(shí),即ttl。然后根據(jù)ttl + 傳入的等待時(shí)間,計(jì)算當(dāng)前線程等待鎖的超時(shí)時(shí)間timeout。
接著執(zhí)行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分?jǐn)?shù)為timeout。客戶端線程每次重復(fù)嘗試加鎖,都會(huì)將其對(duì)應(yīng)的過期時(shí)間往后延長(zhǎng),也就是刷新了排隊(duì)的分?jǐn)?shù)。
zadd命令在添加存在的元素時(shí),會(huì)返回0,但會(huì)更新該元素的分?jǐn)?shù)。
//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//步驟五:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
//如果在隊(duì)列中排隊(duì)的第一個(gè)元素不是當(dāng)前線程
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
//計(jì)算隊(duì)列中第一個(gè)元素還有多少時(shí)間就超時(shí)了
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
//計(jì)算當(dāng)前線程等待鎖的超時(shí)時(shí)間
"local timeout = ttl + tonumber(ARGV[3]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]
internalLockLeaseTime,//ARGV[1]
getLockName(threadId),//ARGV[2]
currentTime + threadWaitTime,//ARGV[3] = 當(dāng)前時(shí)間 + 5秒
currentTime//ARGV[4]
);
}
注意:如果僅僅使用有序集合是不行的,因?yàn)橛行蚣系姆謹(jǐn)?shù)在lua腳本執(zhí)行過程中也會(huì)發(fā)生變化。舊版本中,客戶端線程每次嘗試加鎖,有序集合中的分?jǐn)?shù)會(huì)更新。新版本中,當(dāng)前線程可以嘗試獲取鎖時(shí),也會(huì)遍歷更新有序集合中的分?jǐn)?shù)。
此外,有序集合獲取第一個(gè)元素的時(shí)間復(fù)雜度比隊(duì)列要高。如果僅僅使用隊(duì)列也是不行的,因?yàn)樾枰芾砼抨?duì)線程的等待超時(shí)時(shí)間。如果沒有有序集合,那么就不能移除在隊(duì)列中排隊(duì)已超時(shí)的線程。當(dāng)然,為了管理線程的等待超時(shí)時(shí)間,將有序集合換成兩層Hash值也可以。
5.公平鎖源碼之隊(duì)列重排
(1)新版本在5分鐘后嘗試再次加鎖才會(huì)隊(duì)列重排
(2)舊版本在5秒后嘗試再次加鎖就會(huì)隊(duì)列重排
(3)導(dǎo)致隊(duì)列重排的是lua腳本的步驟一(移除等待超時(shí)的線程)
(1)新版本在5分鐘后嘗試再次加鎖才會(huì)隊(duì)列重排
新版本的公平鎖中,獲取鎖失敗的線程默認(rèn)會(huì)進(jìn)入隊(duì)列最多等待5分鐘。
在這5分鐘內(nèi),該線程不管再次加鎖多少次,都不會(huì)刷新隊(duì)列排序和分?jǐn)?shù)。
在這5分鐘內(nèi),該線程沒有進(jìn)行再次加鎖嘗試,就會(huì)被移出隊(duì)列和有序集合。所以5分鐘后,該線程才嘗試再次加鎖,那么會(huì)重新入隊(duì),導(dǎo)致隊(duì)列重排。
(2)舊版本在5秒后嘗試再次加鎖就會(huì)隊(duì)列重排
舊版本的公平鎖中,獲取鎖失敗的線程默認(rèn)會(huì)進(jìn)入隊(duì)列最多等待5秒鐘。
在這5秒鐘內(nèi),該線程只要重新嘗試進(jìn)行加鎖,那么就會(huì)延長(zhǎng)其最多等待時(shí)間,也就是刷新有序集合中的排隊(duì)分?jǐn)?shù)。
在這5秒鐘內(nèi),該線程沒有進(jìn)行再次加鎖嘗試,就會(huì)被移出隊(duì)列和有序集合。所以5秒鐘后,該線程才嘗試再次加鎖,那么會(huì)重新入隊(duì),導(dǎo)致隊(duì)列重排。
(3)導(dǎo)致隊(duì)列重排的是lua腳本的步驟一(移除等待超時(shí)的線程)
也就是公平鎖lua腳本中while循環(huán)的作用。
當(dāng)客戶端線程使用RedissonLock的tryAcquire()方法嘗試獲取公平鎖,并且指定了一個(gè)獲取鎖的超時(shí)時(shí)間時(shí)。比如指定客戶端線程在隊(duì)列里排隊(duì)超過了20秒,就不再嘗試獲取鎖了。如果獲取鎖的超時(shí)時(shí)間沒有指定,新版本是默認(rèn)5分鐘超時(shí),舊版本是默認(rèn)5秒后超時(shí)。
此時(shí)由于這些等待獲取鎖已超時(shí)的線程元素還存在隊(duì)列和有序集合里,所以可以通過while循環(huán)的邏輯來清除這些不再嘗試獲取鎖的客戶端線程。
在新版本,隨著時(shí)間推移,這些等待獲取鎖超時(shí)的線程就會(huì)被移出隊(duì)列。在舊版本,隨著時(shí)間推移,這些等待獲取鎖超時(shí)的線程只要不再嘗試加鎖,那么其等待獲取鎖的超時(shí)時(shí)間就不會(huì)更新被不斷延長(zhǎng),就會(huì)被移除隊(duì)列。
如果客戶端宕機(jī)了,那么客戶端就不會(huì)重新嘗試獲取鎖。在新版本中,隨著時(shí)間推移,宕機(jī)的客戶端線程就會(huì)被移出隊(duì)列。在舊版本中,就不會(huì)刷新和延長(zhǎng)有序集合中的超時(shí)時(shí)間分?jǐn)?shù),這樣while循環(huán)的邏輯就會(huì)將這些宕機(jī)的客戶端線程從隊(duì)列中移出。
在新版本中,最多5分鐘后,宕機(jī)的客戶端線程會(huì)被移出隊(duì)列。在舊版本中,最多5秒鐘后,宕機(jī)的客戶端線程就會(huì)被移出隊(duì)列。
因?yàn)榫W(wǎng)絡(luò)延遲等原因,可能會(huì)導(dǎo)致客戶端線程等待鎖時(shí)間過長(zhǎng),從而觸發(fā)各個(gè)客戶端線程的排隊(duì)順序的重排序。有的客戶端如果在隊(duì)列里等待時(shí)間過長(zhǎng),可能就會(huì)觸發(fā)一次隊(duì)列的重排序。新版本觸發(fā)重排序的頻率是每5分鐘,舊版本觸發(fā)重排序的頻率是每5秒。
//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
6.公平鎖源碼之釋放鎖
(1)釋放公平鎖的流程
(2)釋放公平鎖的lua腳本分析
(1)釋放公平鎖的流程
釋放公平鎖首先調(diào)用的還是RedissonLock的unlock()方法。
在RedissonLock的unlock()方法中,會(huì)調(diào)用get(unlockAsync())。也就是首先調(diào)用RedissonBaseLock的unlockAsync()方法,然后調(diào)用RedissonObject的get()方法。
其中個(gè)RedissonBaseLock的unlockAsync()方法是異步化執(zhí)行的方法,釋放鎖的操作是異步執(zhí)行的。而RedisObject的get()方法會(huì)通過RFuture同步等待獲取異步執(zhí)行的結(jié)果。所以,可以將get(unlockAsync())理解為異步轉(zhuǎn)同步。
在RedissonBaseLock的unlockAsync()方法中,就會(huì)調(diào)用公平鎖RedissonFairLock的unlockInnerAsync()方法進(jìn)行釋放鎖。然后當(dāng)完成釋放鎖的處理后,會(huì)通過異步去取消定時(shí)調(diào)度任務(wù)。
public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config);
//獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();
fairLock.unlock();
...
}
}
public class RedissonLock extends RedissonBaseLock {
...
@Override
public void unlock() {
...
//異步轉(zhuǎn)同步
//首先調(diào)用的是RedissonBaseLock的unlockAsync()方法
//然后調(diào)用的是RedissonObject的get()方法
get(unlockAsync(Thread.currentThread().getId()));
...
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
@Override
public RFuture<Void> unlockAsync(long threadId) {
//異步執(zhí)行釋放鎖的lua腳本
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//取消定時(shí)調(diào)度任務(wù)
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName;
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
//步驟二:判斷鎖是否還存在,判斷key為鎖名的Hash值是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//獲取隊(duì)列中排第一的線程
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//ARGV[1]為通知事件的類型
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//步驟二:判斷鎖是否還存在,判斷key為UUID+線程ID的Hash值是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//對(duì)key為UUID+線程ID的Hash值還存遞減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//發(fā)布一個(gè)事件給在隊(duì)列中排第一的線程
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
internalLockLeaseTime,
getLockName(threadId),
System.currentTimeMillis()
);
}
...
}
(2)釋放公平鎖的lua腳本分析
步驟一:移除等待超時(shí)的線程
首先也會(huì)進(jìn)入while循環(huán),移除等待超時(shí)的線程。即獲取隊(duì)列中排第一的線程,判斷該線程的過期時(shí)間是否已小于當(dāng)前時(shí)間。如果小于當(dāng)前時(shí)間,那么就說明該線程在隊(duì)列中的排隊(duì)已經(jīng)過期,于是便將該線程從有序集合 + 隊(duì)列中移除。后續(xù)如果該線程再次嘗試加鎖,那么會(huì)重新排序 + 重新入隊(duì)。
步驟二:判斷鎖是否還存在
如果key為鎖名的Hash值已不存在,那么先獲取隊(duì)列中排第一的線程,然后發(fā)布一個(gè)事件給該線程對(duì)應(yīng)的客戶端讓其獲取鎖。
如果key為鎖名的Hash值還存在,那么判斷field為UUID + 線程ID的映射是否存在。如果field為UUID + 線程ID的映射不存在,那么表示鎖已經(jīng)被釋放了,直接返回nil。如果field為UUID + 線程ID的映射存在,那么在key為鎖名的Hash值中,對(duì)field為UUID + 線程ID的value值遞減1。也就是調(diào)用Redis的hincrby命令,進(jìn)行遞減1處理。
步驟三:對(duì)遞減1后的結(jié)果進(jìn)行如下判斷處理
如果遞減1后的結(jié)果大于0,表示線程還在持有鎖。對(duì)應(yīng)于持有鎖的線程多次重入鎖,此時(shí)需要重置鎖的過期時(shí)間。
如果遞減1后的結(jié)果小于0,表示線程不再持有鎖,則刪除鎖對(duì)應(yīng)的key,并且發(fā)布一個(gè)事件給在隊(duì)列中排第一的線程所對(duì)應(yīng)的客戶端。
7.公平鎖源碼之按順序依次加鎖
(1)鎖被釋放后,排第二的客戶端線程先來加鎖
(2)鎖被釋放后,排第一的客戶端線程再來加鎖
假設(shè)客戶端A先持有鎖,而客戶端B在隊(duì)列里面是排在客戶端C的后面。那么如果客戶端A釋放了鎖后,客戶端B和C是如何按順序加鎖的。
(1)鎖被釋放后,排第二的客戶端線程先來加鎖
鎖被客戶端A釋放掉,鎖key被刪除之后,客戶端B先來進(jìn)行嘗試加鎖。此時(shí)客戶端B執(zhí)行的lua腳本步驟二的邏輯:
//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;"
首先,執(zhí)行判斷"exists myLock = 0",由于當(dāng)前鎖存在,所以條件不成立。
然后,執(zhí)行判斷"exists redisson_lock_queue:{myLock} = 0",由于隊(duì)列存在,所以條件不成立。
接著,執(zhí)行判斷"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于隊(duì)列存在,但是在隊(duì)列中排第一的不是客戶端B而是客戶端C,所以條件不成立,客戶端B無法加鎖。
由此可見:即使鎖釋放掉后,多個(gè)客戶端來嘗試加鎖也只認(rèn)隊(duì)列中排第一的客戶端。從而實(shí)現(xiàn)按隊(duì)列的順序依次獲取鎖,保證了公平性。
(2)鎖被釋放后,排第一的客戶端線程再來加鎖
當(dāng)在隊(duì)列中排第一的客戶端C此時(shí)過來嘗試加鎖時(shí),就會(huì)執(zhí)行如下步驟三的嘗試加鎖邏輯:
//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
//decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
//acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;"
首先,執(zhí)行命令"lpop redisson_lock_queue:{myLock}",將隊(duì)列中的第一個(gè)元素彈出來。
然后,執(zhí)行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",將有序集合中客戶端C的線程對(duì)應(yīng)的元素給刪除掉。
接著,執(zhí)行"hset myLock UUID3:ThreadID3 1"進(jìn)行加鎖,設(shè)置field為UUID + 線程ID的value值為1。
最后,執(zhí)行命令"pexpire myLock 30000",設(shè)置key為鎖名的Hash值的過期時(shí)間為30000毫秒。
客戶端C完成加鎖后,客戶端C就會(huì)從隊(duì)列中出隊(duì),此時(shí)排在隊(duì)頭的就是客戶端B。
總結(jié)
以上是生活随笔為你收集整理的分布式锁—3.Redisson的公平锁的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于几种排序算法的时间性能比较
- 下一篇: 8.22 13.1-13.3