日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式锁—3.Redisson的公平锁

發(fā)布時(shí)間:2025/3/8 编程问答 42 如意码农
生活随笔 收集整理的這篇文章主要介紹了 分布式锁—3.Redisson的公平锁 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

大綱

1.Redisson公平鎖RedissonFairLock概述

2.公平鎖源碼之加鎖和排隊(duì)

3.公平鎖源碼之可重入加鎖

4.公平鎖源碼之新舊版本對(duì)比

5.公平鎖源碼之隊(duì)列重排

6.公平鎖源碼之釋放鎖

7.公平鎖源碼之按順序依次加鎖

1.Redisson公平鎖RedissonFairLock概述

(1)非公平和公平的可重入鎖

(2)Redisson公平鎖的簡(jiǎn)單使用

(3)Redisson公平鎖的初始化

(1)非公平和公平的可重入鎖

一.非公平可重入鎖

鎖被釋放后,排隊(duì)獲取鎖的線程會(huì)重新無序獲取鎖,沒有任何順序性可言。

二.公平可重入鎖

鎖被釋放后,排隊(duì)獲取鎖的線程會(huì)按照請(qǐng)求獲取鎖時(shí)候的順序去獲取鎖。公平鎖可以保證線程獲取鎖的順序,與其請(qǐng)求獲取鎖的順序是一樣的。也就是誰先申請(qǐng)獲取到這把鎖,誰就可以先獲取到這把鎖。公平可重入鎖會(huì)把各個(gè)線程的加鎖請(qǐng)求進(jìn)行排隊(duì)處理,保證先申請(qǐng)獲取鎖的線程,可以優(yōu)先獲取鎖,從而實(shí)現(xiàn)所謂的公平性。

三.可重入的非公平鎖和公平鎖不同點(diǎn)

可重入的非公平鎖和公平鎖,在整體的技術(shù)實(shí)現(xiàn)框架上都是一樣的。唯一的不同點(diǎn)就是加鎖和解鎖的邏輯不一樣。非公平鎖的加鎖邏輯,比較簡(jiǎn)單。公平鎖的加鎖邏輯,要加入排隊(duì)機(jī)制,保證各個(gè)線程排隊(duì)能按順序獲取鎖。

(2)Redisson公平鎖的簡(jiǎn)單使用

Redisson的可重入鎖RedissonLock指的是非公平可重入鎖,Redisson的公平鎖RedissonFairLock指的是公平可重入鎖。

Redisson的公平可重入鎖實(shí)現(xiàn)了java.util.concurrent.locks.Lock接口,保證了當(dāng)多個(gè)線程同時(shí)請(qǐng)求加鎖時(shí),優(yōu)先分配給先發(fā)出請(qǐng)求的線程。所有請(qǐng)求線程會(huì)在一個(gè)隊(duì)列中排隊(duì),當(dāng)某個(gè)線程出現(xiàn)宕機(jī)時(shí),Redisson會(huì)等待5秒之后才會(huì)繼續(xù)分配下一個(gè)線程。

RedissonFairLock是RedissonLock的子類。RedissonFairLock的鎖實(shí)現(xiàn)框架,和RedissonLock基本一樣。而在獲取鎖和釋放鎖的lua腳本中,RedissonFairLock的邏輯才有所區(qū)別。

//1.最常見的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock(); //2.10秒鐘以后自動(dòng)解鎖,無需調(diào)用unlock方法手動(dòng)解鎖
fairLock.lock(10, TimeUnit.SECONDS); //3.嘗試加鎖,最多等待100秒,上鎖以后10秒自動(dòng)解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock(); //4.Redisson為公平的可重入鎖提供了異步執(zhí)行的相關(guān)方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

(3)Redisson公平鎖的初始化

public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config); //獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//加鎖
fairLock.unlock();//釋放鎖
}
} public class Redisson implements RedissonClient {
//Redis的連接管理器,封裝了一個(gè)Config實(shí)例
protected final ConnectionManager connectionManager;
//Redis的命令執(zhí)行器,封裝了一個(gè)ConnectionManager實(shí)例
protected final CommandAsyncExecutor commandExecutor;
...
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
//初始化Redis的連接管理器
connectionManager = ConfigSupport.createConnectionManager(configCopy);
...
//初始化Redis的命令執(zhí)行器
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
...
} public RLock getFairLock(String name) {
return new RedissonFairLock(commandExecutor, name);
}
...
} public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
...
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
} public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
...
}
...
} public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
final CommandAsyncExecutor commandExecutor;
...
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
//與WatchDog有關(guān)的internalLockLeaseTime
//通過命令執(zhí)行器CommandExecutor可以獲取連接管理器ConnectionManager
//通過連接管理器ConnectionManager可以獲取Redis的配置信息類Config
//通過Redis的配置信息類Config可以獲取lockWatchdogTimeout超時(shí)時(shí)間
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
...
}
...
} public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor; public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();//獲取UUID
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
...
} abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
} public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec; public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
} public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
} public class ConfigSupport {
...
//創(chuàng)建Redis的連接管理器
public static ConnectionManager createConnectionManager(Config configCopy) {
//生成UUID
UUID id = UUID.randomUUID();
...
if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
//返回ClusterConnectionManager實(shí)例
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
}
...
}
...
} public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
}
...
} public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//初始化時(shí)為UUID
private final Config cfg;
protected Codec codec;
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();//傳入的是UUID
...
this.cfg = cfg;
this.codec = cfg.getCodec();
...
} public String getId() {
return id;
} public Codec getCodec() {
return codec;
}
...
}

2.公平鎖源碼之加鎖和排隊(duì)

(1)加鎖時(shí)的執(zhí)行流程

(2)獲取公平鎖的lua腳本相關(guān)參數(shù)說明

(3)lua腳本步驟一:進(jìn)入while循環(huán)移除隊(duì)列和有序集合中等待超時(shí)的線程

(4)lua腳本步驟二:判斷當(dāng)前線程能否獲取鎖

(5)lua腳本步驟三:執(zhí)行獲取鎖的操作

(6)lua腳本步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖)

(7)lua腳本步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)

(8)lua腳本步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)

(9)獲取鎖失敗的第一個(gè)線程執(zhí)行l(wèi)ua腳本的流程

(10)獲取鎖失敗的第二個(gè)線程執(zhí)行l(wèi)ua腳本的流程

(1)加鎖時(shí)的執(zhí)行流程

使用Redisson的公平鎖RedissonFairLock進(jìn)行加鎖時(shí):首先調(diào)用的是RedissonLock的lock()方法,然后會(huì)調(diào)用RedissonLock的tryAcquire()方法,接著會(huì)調(diào)用RedissonLock的tryAcquireAsync()方法。

在RedissonLock的tryAcquireAsync()方法中,會(huì)調(diào)用一個(gè)可以被RedissonLock子類重載的tryLockInnerAsync()方法。對(duì)于非公平鎖,執(zhí)行到這會(huì)調(diào)用RedissonLock的tryLockInnerAsync()方法。對(duì)于公平鎖,執(zhí)行到這會(huì)調(diào)用RedissonFairLock的tryLockInnerAsync()方法。

在RedissonFairLock的tryLockInnerAsync()方法中,便執(zhí)行具體的lua腳本。

public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config); //獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//加鎖
fairLock.unlock();//釋放鎖
}
} public class RedissonLock extends RedissonBaseLock {
...
//不帶參數(shù)的加鎖
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
} //帶參數(shù)的加鎖
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
} private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//加鎖成功
if (ttl == null) {
return;
}
//加鎖失敗
...
} private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
} private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//非公平鎖,接下來調(diào)用的是RedissonLock.tryLockInnerAsync()方法
//公平鎖,接下來調(diào)用的是RedissonFairLock.tryLockInnerAsync()方法
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//對(duì)RFuture<Long>類型的ttlRemainingFuture添加回調(diào)監(jiān)聽
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
//tryLockInnerAsync()里的加鎖lua腳本異步執(zhí)行完畢,會(huì)回調(diào)如下方法邏輯:
//加鎖成功
if (ttlRemaining == null) {
if (leaseTime != -1) {
//如果傳入的leaseTime不是-1,也就是指定鎖的過期時(shí)間,那么就不創(chuàng)建定時(shí)調(diào)度任務(wù)
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
//創(chuàng)建定時(shí)調(diào)度任務(wù)
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
...
} public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;//線程可以等待鎖的時(shí)間
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);//傳入60秒*5=5分鐘
} public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
}
...
@Override
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
long wait = threadWaitTime;
if (waitTime != -1) {
//將傳入的指定的獲取鎖等待時(shí)間賦值給wait變量
wait = unit.toMillis(waitTime);
}
...
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" + //check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" + //decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" + //acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" + //check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" + //the lock cannot be acquired, check if the thread is already in the queue
//步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" + //add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of
//the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime
//步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID+線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime),
getLockName(threadId),
wait,
currentTime
);
}
...
}
...
}

(2)獲取公平鎖的lua腳本相關(guān)參數(shù)說明

KEYS[1]是getRawName(),它是一個(gè)Hash數(shù)據(jù)結(jié)構(gòu)的key,也就是鎖的名字,比如"myLock"。

KEYS[2]是threadsQueueName,它是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字,多個(gè)客戶端線程申請(qǐng)獲取鎖時(shí),會(huì)到這個(gè)隊(duì)列里進(jìn)行排隊(duì)。比如"redisson_lock_queue:{myLock}"。

KEYS[3]是timeoutSetName,它是一個(gè)用來對(duì)線程排序的有序集合的名字,這個(gè)有序集合可以自動(dòng)按照每個(gè)數(shù)據(jù)指定的分?jǐn)?shù)進(jìn)行排序。比如"redisson_lock_timeout:{myLock}"。

ARGV[1]是leaseTime,代表鎖的過期時(shí)間。如果leaseTime沒有指定,默認(rèn)就是internalLockLeaseTime = 30秒。

ARGV[2]是getLockName(threadId),代表客戶端UUID + 線程ID。

ARGV[3]是threadWaitTime,代表線程可以等待的時(shí)間(默認(rèn)5分鐘)。

ARGV[4]是currentTime,代表當(dāng)前時(shí)間。

(3)lua腳本步驟一:進(jìn)入while循環(huán)移除隊(duì)列和有序集合中等待超時(shí)的線程

while循環(huán)中首先執(zhí)行命令:"lindex redisson_lock_queue:{myLock} 0",也就是獲取"redisson_lock_queue:{myLock}"這個(gè)隊(duì)列中的第一個(gè)元素。一開始該隊(duì)列是空的,所以什么都獲取不到,firstThreadId2為false。此時(shí)就會(huì)break掉,退出while循環(huán)。

如果獲取到隊(duì)列中的第一個(gè)元素,那么就會(huì)執(zhí)行zscore命令:從有序集合中獲取該元素對(duì)應(yīng)的分?jǐn)?shù),也就是該元素對(duì)應(yīng)線程的過期時(shí)間。如果過期時(shí)間比當(dāng)前時(shí)間小,那么就要從隊(duì)列和有序集合中移除該元素。否則,也會(huì)break掉,退出while循環(huán)。

//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待鎖超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//remove the item from the queue and timeout set NOTE we do not alter any other timeout
//從有序集合+隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +

(4)lua腳本步驟二:判斷當(dāng)前線程能否獲取鎖

判斷條件一:

首先執(zhí)行命令"exists myLock",判斷鎖是否存在。一開始沒有線程加過鎖,所以判斷條件肯定是成立的,該條件為true。

判斷條件二:

接著執(zhí)行命令"exists redisson_lock_queue:{myLock}",看隊(duì)列是否存在。一開始也沒有這個(gè)隊(duì)列,所以這個(gè)條件也肯定成立,該條件為true。

判斷條件三:

如果有這個(gè)隊(duì)列,則判斷隊(duì)列存在的條件不成立,執(zhí)行"或"后面的判斷。也就是執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",判斷隊(duì)列的第一個(gè)元素是否是當(dāng)前線程的UUID + ThreadID。

//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;" +

總結(jié)當(dāng)前線程現(xiàn)在可以嘗試獲取鎖的情況如下:

情況一:鎖不存在 + 隊(duì)列也不存在

情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程

(5)lua腳本步驟三:執(zhí)行獲取鎖的操作

當(dāng)判斷現(xiàn)在能否嘗試獲取鎖的條件通過后,便會(huì)執(zhí)行如下操作:

步驟一:執(zhí)行命令"lpop redisson_lock_queue:{myLock}",彈出隊(duì)列第一個(gè)元素。一開始該隊(duì)列是空的,所以該命令不會(huì)進(jìn)行處理。接著執(zhí)行命令"zrem redisson_lock_timeout:{myLock} UUID1:ThreadID1",也就是從有序集合中刪除UUID1:ThreadID1對(duì)應(yīng)的元素。一開始該有序集合也是空的,所以該命令不會(huì)進(jìn)行處理。

步驟二:執(zhí)行命令"hset myLock UUID1:ThreadID1 1",進(jìn)行加鎖操作。在設(shè)置key為myLock的Hash值中,field為UUID1:ThreadID1的value值為1。接著執(zhí)行命令"pexpire myLock 30000",設(shè)置鎖key的過期時(shí)間為30秒。

最后返回nil,這樣在外層代碼中,就會(huì)認(rèn)為加鎖成功。于是就會(huì)創(chuàng)建一個(gè)WatchDog看門狗定時(shí)調(diào)度任務(wù),10秒后對(duì)鎖進(jìn)行檢查。如果檢查發(fā)現(xiàn)當(dāng)前線程還持有這個(gè)鎖,那么就重置鎖key的過期時(shí)間為30秒,并且重新創(chuàng)建一個(gè)WatchDog看門狗定時(shí)調(diào)度任務(wù)在10秒后繼續(xù)進(jìn)行檢查。

//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" + //decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" + //acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +

(6)lua腳本步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖)

此時(shí)會(huì)執(zhí)行命令"hexists myLock UUID:ThreadID"。如果判斷條件通過,則說明是持有鎖的線程對(duì)鎖進(jìn)行了重入。于是會(huì)執(zhí)行命令"hincrby myLock UUID:ThreadID 1",對(duì)key為鎖名的Hash值中,field為UUID + 線程ID的value值累加1。并且執(zhí)行命令"pexpire myLock 300000"重置鎖key的過期時(shí)間。最后返回nil,表示重入加鎖成功。

//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +

(7)lua腳本步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)

通過執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID:ThreadID",獲取當(dāng)前線程在有序集合中的對(duì)應(yīng)的分?jǐn)?shù),也就是過期時(shí)間。如果獲取成功則返回:當(dāng)前線程等待獲取鎖的超時(shí)時(shí)間還剩多少,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間。

//the lock cannot be acquired, check if the thread is already in the queue
//步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//the real timeout is the timeout of the prior thread in the queue,
//but this is approximately correct, and avoids having to traverse the queue
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +

(8)lua腳本步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)

首先獲取隊(duì)列中的最后一個(gè)元素。因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的,所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間。從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的。然后獲取鎖或者隊(duì)列中排最后的線程剩余的存活時(shí)間,接著計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間。

然后把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置有序集合中該元素的分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間,接著再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部。

最后返回鎖或者隊(duì)列中排第一的線程剩余的存活時(shí)間ttl給外層代碼。如果外層代碼拿到的返回值是非null,那么客戶端會(huì)進(jìn)入一個(gè)while循環(huán)。在while循環(huán)會(huì)每阻塞等待ttl時(shí)間再嘗試去進(jìn)行加鎖,重新執(zhí)行l(wèi)ua腳本。

如果隊(duì)列里沒有元素,那么第一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待鎖的過期時(shí)間。如果隊(duì)列里有元素,那么后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間。

//步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",

(9)獲取鎖失敗的第一個(gè)線程執(zhí)行l(wèi)ua腳本的流程

公平鎖的核心在于申請(qǐng)加鎖時(shí),加鎖失敗的各個(gè)客戶端會(huì)排隊(duì)。之后鎖被釋放時(shí),會(huì)依次獲取鎖,從而實(shí)現(xiàn)公平性。

假設(shè)此時(shí)第一個(gè)客戶端線程已加鎖成功,第二個(gè)客戶端線程也來嘗試加鎖,那么會(huì)進(jìn)行如下排隊(duì)處理。

步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。由于此時(shí)隊(duì)列還是空的,所以獲取到的是false,于是退出while循環(huán)。

步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖。因?yàn)閳?zhí)行命令"exists myLock",發(fā)現(xiàn)鎖已經(jīng)存在了,于是判斷不通過。

步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有,由于第二個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程。所以此時(shí)執(zhí)行命令"hexists myLock UUID2:ThreadID2",發(fā)現(xiàn)不存在。所以此處的可重入鎖的判斷條件也不成立。

步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第一個(gè)獲取鎖失敗的線程,所以判斷不通過。

步驟五:接下來進(jìn)行排隊(duì)處理。

//對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID+線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"

首先執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0"。也就是從隊(duì)列中獲取最后一個(gè)元素,由于此時(shí)隊(duì)列是空,所以獲取不到元素。然后執(zhí)行命令"ttl = pttl myLock",獲取鎖剩余的存活時(shí)間。

接著計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間。假設(shè)myLock剩余的存活時(shí)間ttl為20秒,那么timeout = ttl + 5分鐘 + 當(dāng)前時(shí)間 = 20秒 + 5分鐘 + 10:00:00 = 10:05:20;

然后執(zhí)行命令"zadd redisson_lock_timeout:{myLock} 10:05:20 UUID2:ThreadID2",這行命令的意思是,在有序集合中插入一個(gè)元素。元素值是UUID2:ThreadID2,元素對(duì)應(yīng)的分?jǐn)?shù)是10:05:20。分?jǐn)?shù)會(huì)用時(shí)間的Long型時(shí)間戳來表示,時(shí)間越靠后,時(shí)間戳就越大。有序集合Sorted Set會(huì)自動(dòng)根據(jù)插入的元素分?jǐn)?shù)從小到大進(jìn)行排序。

接著執(zhí)行命令"rpush redisson_lock_queue:{myLock} UUID2:TheadID2",這行命令的意思是,將UUID2:ThreadID2插入到隊(duì)列的尾部。

最后返回ttl給外層代碼,也就是返回myLock剩余的存活時(shí)間。如果外層代碼拿到的ttl是非null,那么客戶端會(huì)進(jìn)入一個(gè)while循環(huán)。在while循環(huán)會(huì)每阻塞等待ttl時(shí)間就嘗試進(jìn)行加鎖,重新執(zhí)行l(wèi)ua腳本。

(10)獲取鎖失敗的第二個(gè)線程執(zhí)行l(wèi)ua腳本的流程

如果此時(shí)有第三個(gè)客戶端線程也來嘗試加鎖,那么會(huì)進(jìn)行如下排隊(duì)處理。

步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。

繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),timeout = 10:05:20。

假設(shè)當(dāng)前時(shí)間是10:00:25,那么timeout <= 10:00:25的這個(gè)條件不成立,于是退出while循環(huán)。

步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。

步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于第三個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程。所以此時(shí)執(zhí)行命令"hexists myLock UUID3:ThreadID3",發(fā)現(xiàn)不存在。所以此處的可重入鎖的判斷條件也不成立。

步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第二個(gè)獲取鎖失敗的線程,所以判斷不通過。

步驟五:接下來進(jìn)行排隊(duì)處理。

//對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;"

首先執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取到隊(duì)列中的最后一個(gè)元素UUID2:ThreadID2。

然后判斷條件是否成立:lastThreadId不為false + lastThreadId不是自己。由于此時(shí)的ARGV[2] = UUID3:ThreadID3,所以判斷條件成立。即在隊(duì)列里排隊(duì)的最后一個(gè)元素并不是當(dāng)前嘗試獲取鎖的客戶端線程。

于是執(zhí)行:"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2" - 當(dāng)前時(shí)間,也就是獲取在隊(duì)列中排最后的線程還有多少時(shí)間就會(huì)過期,從而得到ttl。

接著根據(jù)ttl計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間timeout,然后執(zhí)行zadd和rpush命令對(duì)當(dāng)前線程進(jìn)行入隊(duì)和排隊(duì),最后返回ttl。

3.公平鎖源碼之可重入加鎖

持有公平鎖的客戶端重復(fù)進(jìn)行l(wèi)ock.lock(),執(zhí)行加鎖lua腳本的流程如下:

步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。

繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),timeout = 10:05:20。

假設(shè)當(dāng)前時(shí)間是10:00:25,那么timeout <= 10:00:25的這個(gè)條件不成立,于是退出while循環(huán)。

步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。

步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于當(dāng)前線程的UUID + 線程ID等于持有鎖的線程。即此時(shí)執(zhí)行命令"hexists myLock UUID:ThreadID"發(fā)現(xiàn)key是存在的,所以此處的可重入鎖的判斷條件成立。

于是會(huì)執(zhí)行命令"hincrby myLock UUID:ThreadID 1",對(duì)key為鎖名的Hash值中,key為UUID + 線程ID的Hash值累加1。并且執(zhí)行命令"pexpire myLock 300000"重置鎖key的過期時(shí)間。最后返回nil,表示重入加鎖成功。

//check if the lock is already held, and this is a re-entry(可重入鎖)
//步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +

4.公平鎖源碼之新舊版本對(duì)比

(1)新版本再次加鎖失敗不會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)

(2)舊版本再次加鎖失敗會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)

當(dāng)客戶端線程嘗試加公平鎖失敗處于排隊(duì)狀態(tài)時(shí),會(huì)進(jìn)入while循環(huán)。在while循環(huán)中,每次都會(huì)等待一段時(shí)間,再重新進(jìn)行嘗試加公平鎖。

public class RedissonLock extends RedissonBaseLock {
...
//加鎖
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
} private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//線程ID,用來生成設(shè)置Hash的值
long threadId = Thread.currentThread().getId();
//嘗試加鎖,此時(shí)執(zhí)行RedissonLock.lock()方法默認(rèn)傳入的leaseTime=-1
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
//ttl為null說明加鎖成功
if (ttl == null) {
return;
} //加鎖失敗時(shí)的處理
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
} try {
while (true) {
//再次嘗試獲取鎖
ttl = tryAcquire(-1, leaseTime, unit, threadId);
//返回的ttl為null,獲取到鎖,就退出while循環(huán)
if (ttl == null) {
break;
}
//返回的ttl不為null,則說明其他客戶端或線程還持有鎖
//那么就利用同步組件Semaphore進(jìn)行阻塞等待一段ttl的時(shí)間
if (ttl >= 0) {
try {
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
commandExecutor.getNow(future).getLatch().acquire();
} else {
commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(commandExecutor.getNow(future), threadId);
}
}
...
}

假設(shè)第二個(gè)客戶端線程第一次加鎖是在10:00:00,然后在10:00:15該客戶端線程再次發(fā)起請(qǐng)求嘗試進(jìn)行加鎖,但第一個(gè)客戶端線程在10:00:00~10:00:15之間一直持有這把鎖,此時(shí)第二個(gè)客戶端線程的再次加鎖流程如下:

(1)新版本再次加鎖失敗不會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)

步驟一:進(jìn)入while循環(huán),移除等待超時(shí)的線程。執(zhí)行命令"lindex redisson_lock_queue:{myLock} 0",獲取隊(duì)列排第一元素。此時(shí)獲取到UUID2:ThreadID2,代表著第二個(gè)客戶端線程正在隊(duì)列里排隊(duì)。

繼續(xù)執(zhí)行命令"zscore redisson_lock_timeout:{myLock} UUID2:ThreadID2",從有序集合中獲取UUID2:ThreadID2對(duì)應(yīng)的分?jǐn)?shù),比如獲取到的timeout = 10:05:20。根據(jù)當(dāng)前時(shí)間是10:00:15,那么timeout <= 10:00:15的這個(gè)條件不成立,于是退出while循環(huán)。

步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,發(fā)現(xiàn)不能通過。因?yàn)閳?zhí)行命令"exists myLock"時(shí),發(fā)現(xiàn)鎖已經(jīng)存在。

步驟三:判斷鎖是否已經(jīng)被當(dāng)前線程持有。由于第二個(gè)客戶端線程的UUID + 線程ID必然不等于第一個(gè)客戶端線程,所以此時(shí)執(zhí)行命令"hexists myLock UUID2:ThreadID2",發(fā)現(xiàn)不存在,所以此處的可重入鎖的判斷條件也不成立。

步驟四:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)。由于當(dāng)前線程是第二次嘗試獲取鎖,所以判斷通過。然后返回第二個(gè)客戶端線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí),不會(huì)刷新排隊(duì)分?jǐn)?shù)。

//Redisson的3.16.8版本
if (command == RedisCommands.EVAL_LONG) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
//步驟一:remove stale threads,移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end;" +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2);" +
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" + //步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" + //遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" + //hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" + //步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有(可重入鎖),KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" + //步驟五:判斷當(dāng)前獲取鎖失敗的線程是否已經(jīng)在隊(duì)列中排隊(duì)
//KEYS[3]是對(duì)線程排序的有序集合,ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
//如果當(dāng)前獲取鎖失敗的線程已經(jīng)在隊(duì)列中排隊(duì)
//那么就返回該線程等待獲取鎖時(shí),還剩多少時(shí)間就超時(shí)了,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//ARGV[3]是當(dāng)前線程獲取鎖時(shí)可以等待的時(shí)間,ARGV[4]是當(dāng)前時(shí)間
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" + //步驟六:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local lastThreadId = redis.call('lindex', KEYS[2], -1);" +
"local ttl;" +
//如果在隊(duì)列中排隊(duì)的最后一個(gè)元素不是當(dāng)前線程
"if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " +
//lastThreadId是在隊(duì)列中排最后的線程,ARGV[2]是當(dāng)前線程的UUID + 線程ID,ARGV[4]是當(dāng)前時(shí)間
//因?yàn)閾碛凶畲筮^期時(shí)間的線程在隊(duì)列中是排最后的
//所以可通過隊(duì)列中的最后一個(gè)元素的過期時(shí)間,計(jì)算當(dāng)前線程的過期時(shí)間
//從而保證新加入隊(duì)列和有序集合的線程的過期時(shí)間是最大的
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,當(dāng)前隊(duì)列中排最后的線程就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
//這樣后一個(gè)加入隊(duì)列的線程,會(huì)阻塞等待前一個(gè)加入隊(duì)列的線程的過期時(shí)間
"ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" +
"else " +
//下面這一行會(huì)計(jì)算出:還有多少時(shí)間,鎖就會(huì)過期,外部代碼拿到這個(gè)時(shí)間會(huì)阻塞等待這個(gè)時(shí)間
"ttl = redis.call('pttl', KEYS[1]);" +
"end;" +
//計(jì)算當(dāng)前線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
"local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end;" +
"return ttl;",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName),
unit.toMillis(leaseTime),
getLockName(threadId),
wait,//默認(rèn)是5分鐘
currentTime
);
}

(2)舊版本再次加鎖失敗會(huì)刷新排隊(duì)分?jǐn)?shù)(等待超時(shí)的時(shí)間點(diǎn)timeout)

舊版本公平鎖的lua腳本如下所示,當(dāng)?shù)诙€(gè)客戶端線程再次加鎖時(shí)會(huì)再次進(jìn)入排隊(duì)邏輯。

首先會(huì)出計(jì)算隊(duì)列中的第一個(gè)元素還有多少時(shí)間就超時(shí),即ttl。然后根據(jù)ttl + 傳入的等待時(shí)間,計(jì)算當(dāng)前線程等待鎖的超時(shí)時(shí)間timeout。

接著執(zhí)行命令"zadd redisson_lock_timeout:{myLock} timeout UUID2:ThreadID2",刷新有序集合中的同名元素的分?jǐn)?shù)為timeout。客戶端線程每次重復(fù)嘗試加鎖,都會(huì)將其對(duì)應(yīng)的過期時(shí)間往后延長(zhǎng),也就是刷新了排隊(duì)的分?jǐn)?shù)。

zadd命令在添加存在的元素時(shí),會(huì)返回0,但會(huì)更新該元素的分?jǐn)?shù)。

//Redisson的3.8.1版本
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" + //步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " + //步驟四:判斷鎖是否已經(jīng)被當(dāng)前線程持有,KEYS[1]是鎖的名字,ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " + //步驟五:對(duì)獲取鎖失敗的線程進(jìn)行排隊(duì)處理
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
//如果在隊(duì)列中排隊(duì)的第一個(gè)元素不是當(dāng)前線程
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " +
//計(jì)算隊(duì)列中第一個(gè)元素還有多少時(shí)間就超時(shí)了
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else " +
"ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
//計(jì)算當(dāng)前線程等待鎖的超時(shí)時(shí)間
"local timeout = ttl + tonumber(ARGV[3]);" +
//把當(dāng)前線程作為一個(gè)元素插入有序集合,并設(shè)置元素分?jǐn)?shù)為該線程在排隊(duì)等待鎖時(shí)的過期時(shí)間
//然后再把當(dāng)前線程作為一個(gè)元素插入隊(duì)列尾部
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
"redis.call('rpush', KEYS[2], ARGV[2]);" +
"end; " +
"return ttl;",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName),//KEYS[1]、KEYS[2]、KEYS[3]
internalLockLeaseTime,//ARGV[1]
getLockName(threadId),//ARGV[2]
currentTime + threadWaitTime,//ARGV[3] = 當(dāng)前時(shí)間 + 5秒
currentTime//ARGV[4]
);
}

注意:如果僅僅使用有序集合是不行的,因?yàn)橛行蚣系姆謹(jǐn)?shù)在lua腳本執(zhí)行過程中也會(huì)發(fā)生變化。舊版本中,客戶端線程每次嘗試加鎖,有序集合中的分?jǐn)?shù)會(huì)更新。新版本中,當(dāng)前線程可以嘗試獲取鎖時(shí),也會(huì)遍歷更新有序集合中的分?jǐn)?shù)。

此外,有序集合獲取第一個(gè)元素的時(shí)間復(fù)雜度比隊(duì)列要高。如果僅僅使用隊(duì)列也是不行的,因?yàn)樾枰芾砼抨?duì)線程的等待超時(shí)時(shí)間。如果沒有有序集合,那么就不能移除在隊(duì)列中排隊(duì)已超時(shí)的線程。當(dāng)然,為了管理線程的等待超時(shí)時(shí)間,將有序集合換成兩層Hash值也可以。

5.公平鎖源碼之隊(duì)列重排

(1)新版本在5分鐘后嘗試再次加鎖才會(huì)隊(duì)列重排

(2)舊版本在5秒后嘗試再次加鎖就會(huì)隊(duì)列重排

(3)導(dǎo)致隊(duì)列重排的是lua腳本的步驟一(移除等待超時(shí)的線程)

(1)新版本在5分鐘后嘗試再次加鎖才會(huì)隊(duì)列重排

新版本的公平鎖中,獲取鎖失敗的線程默認(rèn)會(huì)進(jìn)入隊(duì)列最多等待5分鐘。

在這5分鐘內(nèi),該線程不管再次加鎖多少次,都不會(huì)刷新隊(duì)列排序和分?jǐn)?shù)。

在這5分鐘內(nèi),該線程沒有進(jìn)行再次加鎖嘗試,就會(huì)被移出隊(duì)列和有序集合。所以5分鐘后,該線程才嘗試再次加鎖,那么會(huì)重新入隊(duì),導(dǎo)致隊(duì)列重排。

(2)舊版本在5秒后嘗試再次加鎖就會(huì)隊(duì)列重排

舊版本的公平鎖中,獲取鎖失敗的線程默認(rèn)會(huì)進(jìn)入隊(duì)列最多等待5秒鐘。

在這5秒鐘內(nèi),該線程只要重新嘗試進(jìn)行加鎖,那么就會(huì)延長(zhǎng)其最多等待時(shí)間,也就是刷新有序集合中的排隊(duì)分?jǐn)?shù)。

在這5秒鐘內(nèi),該線程沒有進(jìn)行再次加鎖嘗試,就會(huì)被移出隊(duì)列和有序集合。所以5秒鐘后,該線程才嘗試再次加鎖,那么會(huì)重新入隊(duì),導(dǎo)致隊(duì)列重排。

(3)導(dǎo)致隊(duì)列重排的是lua腳本的步驟一(移除等待超時(shí)的線程)

也就是公平鎖lua腳本中while循環(huán)的作用。

當(dāng)客戶端線程使用RedissonLock的tryAcquire()方法嘗試獲取公平鎖,并且指定了一個(gè)獲取鎖的超時(shí)時(shí)間時(shí)。比如指定客戶端線程在隊(duì)列里排隊(duì)超過了20秒,就不再嘗試獲取鎖了。如果獲取鎖的超時(shí)時(shí)間沒有指定,新版本是默認(rèn)5分鐘超時(shí),舊版本是默認(rèn)5秒后超時(shí)。

此時(shí)由于這些等待獲取鎖已超時(shí)的線程元素還存在隊(duì)列和有序集合里,所以可以通過while循環(huán)的邏輯來清除這些不再嘗試獲取鎖的客戶端線程。

在新版本,隨著時(shí)間推移,這些等待獲取鎖超時(shí)的線程就會(huì)被移出隊(duì)列。在舊版本,隨著時(shí)間推移,這些等待獲取鎖超時(shí)的線程只要不再嘗試加鎖,那么其等待獲取鎖的超時(shí)時(shí)間就不會(huì)更新被不斷延長(zhǎng),就會(huì)被移除隊(duì)列。

如果客戶端宕機(jī)了,那么客戶端就不會(huì)重新嘗試獲取鎖。在新版本中,隨著時(shí)間推移,宕機(jī)的客戶端線程就會(huì)被移出隊(duì)列。在舊版本中,就不會(huì)刷新和延長(zhǎng)有序集合中的超時(shí)時(shí)間分?jǐn)?shù),這樣while循環(huán)的邏輯就會(huì)將這些宕機(jī)的客戶端線程從隊(duì)列中移出。

在新版本中,最多5分鐘后,宕機(jī)的客戶端線程會(huì)被移出隊(duì)列。在舊版本中,最多5秒鐘后,宕機(jī)的客戶端線程就會(huì)被移出隊(duì)列。

因?yàn)榫W(wǎng)絡(luò)延遲等原因,可能會(huì)導(dǎo)致客戶端線程等待鎖時(shí)間過長(zhǎng),從而觸發(fā)各個(gè)客戶端線程的排隊(duì)順序的重排序。有的客戶端如果在隊(duì)列里等待時(shí)間過長(zhǎng),可能就會(huì)觸發(fā)一次隊(duì)列的重排序。新版本觸發(fā)重排序的頻率是每5分鐘,舊版本觸發(fā)重排序的頻率是每5秒。

//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +

6.公平鎖源碼之釋放鎖

(1)釋放公平鎖的流程

(2)釋放公平鎖的lua腳本分析

(1)釋放公平鎖的流程

釋放公平鎖首先調(diào)用的還是RedissonLock的unlock()方法。

在RedissonLock的unlock()方法中,會(huì)調(diào)用get(unlockAsync())。也就是首先調(diào)用RedissonBaseLock的unlockAsync()方法,然后調(diào)用RedissonObject的get()方法。

其中個(gè)RedissonBaseLock的unlockAsync()方法是異步化執(zhí)行的方法,釋放鎖的操作是異步執(zhí)行的。而RedisObject的get()方法會(huì)通過RFuture同步等待獲取異步執(zhí)行的結(jié)果。所以,可以將get(unlockAsync())理解為異步轉(zhuǎn)同步。

在RedissonBaseLock的unlockAsync()方法中,就會(huì)調(diào)用公平鎖RedissonFairLock的unlockInnerAsync()方法進(jìn)行釋放鎖。然后當(dāng)完成釋放鎖的處理后,會(huì)通過異步去取消定時(shí)調(diào)度任務(wù)。

public class Application {
public static void main(String[] args) throws Exception {
Config config = new Config();
config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
//創(chuàng)建RedissonClient實(shí)例
RedissonClient redisson = Redisson.create(config);
//獲取公平的可重入鎖
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();
fairLock.unlock();
...
}
} public class RedissonLock extends RedissonBaseLock {
...
@Override
public void unlock() {
...
//異步轉(zhuǎn)同步
//首先調(diào)用的是RedissonBaseLock的unlockAsync()方法
//然后調(diào)用的是RedissonObject的get()方法
get(unlockAsync(Thread.currentThread().getId()));
...
}
...
} public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
@Override
public RFuture<Void> unlockAsync(long threadId) {
//異步執(zhí)行釋放鎖的lua腳本
RFuture<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
//取消定時(shí)調(diào)度任務(wù)
cancelExpirationRenewal(threadId);
if (e != null) {
throw new CompletionException(e);
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
throw new CompletionException(cause);
}
return null;
});
return new CompletableFutureWrapper<>(f);
}
protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
...
} public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
private final String threadsQueueName;
private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
} public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
threadsQueueName = prefixName("redisson_lock_queue", name);
timeoutSetName = prefixName("redisson_lock_timeout", name);
} @Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//步驟一:移除等待超時(shí)的線程
"while true do " +
//獲取隊(duì)列中的第一個(gè)元素
//KEYS[2]是一個(gè)用來對(duì)線程排隊(duì)的隊(duì)列的名字
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
"if firstThreadId2 == false then " +
"break;" +
"end; " +
//獲取隊(duì)列中第一個(gè)元素對(duì)應(yīng)的分?jǐn)?shù),也就是排第一的線程的過期時(shí)間
//KEYS[3]是一個(gè)用來對(duì)線程排序的有序集合的名字
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
//如果排第一的線程的過期時(shí)間小于當(dāng)前時(shí)間,說明該線程等待超時(shí)了都還沒獲取到鎖,所以要移除
//ARGV[4]是當(dāng)前時(shí)間
"if timeout <= tonumber(ARGV[4]) then " +
//從有序集合 + 隊(duì)列中移除這個(gè)線程
"redis.call('zrem', KEYS[3], firstThreadId2); " +
"redis.call('lpop', KEYS[2]); " +
"else " +
"break;" +
"end; " +
"end;" +
//步驟二:判斷鎖是否還存在,判斷key為鎖名的Hash值是否存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//獲取隊(duì)列中排第一的線程
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//ARGV[1]為通知事件的類型
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
//步驟二:判斷鎖是否還存在,判斷key為UUID+線程ID的Hash值是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//對(duì)key為UUID+線程ID的Hash值還存遞減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " + "redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
//發(fā)布一個(gè)事件給在隊(duì)列中排第一的線程
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
internalLockLeaseTime,
getLockName(threadId),
System.currentTimeMillis()
);
}
...
}

(2)釋放公平鎖的lua腳本分析

步驟一:移除等待超時(shí)的線程

首先也會(huì)進(jìn)入while循環(huán),移除等待超時(shí)的線程。即獲取隊(duì)列中排第一的線程,判斷該線程的過期時(shí)間是否已小于當(dāng)前時(shí)間。如果小于當(dāng)前時(shí)間,那么就說明該線程在隊(duì)列中的排隊(duì)已經(jīng)過期,于是便將該線程從有序集合 + 隊(duì)列中移除。后續(xù)如果該線程再次嘗試加鎖,那么會(huì)重新排序 + 重新入隊(duì)。

步驟二:判斷鎖是否還存在

如果key為鎖名的Hash值已不存在,那么先獲取隊(duì)列中排第一的線程,然后發(fā)布一個(gè)事件給該線程對(duì)應(yīng)的客戶端讓其獲取鎖。

如果key為鎖名的Hash值還存在,那么判斷field為UUID + 線程ID的映射是否存在。如果field為UUID + 線程ID的映射不存在,那么表示鎖已經(jīng)被釋放了,直接返回nil。如果field為UUID + 線程ID的映射存在,那么在key為鎖名的Hash值中,對(duì)field為UUID + 線程ID的value值遞減1。也就是調(diào)用Redis的hincrby命令,進(jìn)行遞減1處理。

步驟三:對(duì)遞減1后的結(jié)果進(jìn)行如下判斷處理

如果遞減1后的結(jié)果大于0,表示線程還在持有鎖。對(duì)應(yīng)于持有鎖的線程多次重入鎖,此時(shí)需要重置鎖的過期時(shí)間。

如果遞減1后的結(jié)果小于0,表示線程不再持有鎖,則刪除鎖對(duì)應(yīng)的key,并且發(fā)布一個(gè)事件給在隊(duì)列中排第一的線程所對(duì)應(yīng)的客戶端。

7.公平鎖源碼之按順序依次加鎖

(1)鎖被釋放后,排第二的客戶端線程先來加鎖

(2)鎖被釋放后,排第一的客戶端線程再來加鎖

假設(shè)客戶端A先持有鎖,而客戶端B在隊(duì)列里面是排在客戶端C的后面。那么如果客戶端A釋放了鎖后,客戶端B和C是如何按順序加鎖的。

(1)鎖被釋放后,排第二的客戶端線程先來加鎖

鎖被客戶端A釋放掉,鎖key被刪除之后,客戶端B先來進(jìn)行嘗試加鎖。此時(shí)客戶端B執(zhí)行的lua腳本步驟二的邏輯:

//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID + ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
...
"end;"

首先,執(zhí)行判斷"exists myLock = 0",由于當(dāng)前鎖存在,所以條件不成立。

然后,執(zhí)行判斷"exists redisson_lock_queue:{myLock} = 0",由于隊(duì)列存在,所以條件不成立。

接著,執(zhí)行判斷"lindex redisson_lock_queue:{myLock} 0 == UUID2:ThreadID2",由于隊(duì)列存在,但是在隊(duì)列中排第一的不是客戶端B而是客戶端C,所以條件不成立,客戶端B無法加鎖。

由此可見:即使鎖釋放掉后,多個(gè)客戶端來嘗試加鎖也只認(rèn)隊(duì)列中排第一的客戶端。從而實(shí)現(xiàn)按隊(duì)列的順序依次獲取鎖,保證了公平性。

(2)鎖被釋放后,排第一的客戶端線程再來加鎖

當(dāng)在隊(duì)列中排第一的客戶端C此時(shí)過來嘗試加鎖時(shí),就會(huì)執(zhí)行如下步驟三的嘗試加鎖邏輯:

//check if the lock can be acquired now
//步驟二:判斷當(dāng)前線程現(xiàn)在能否嘗試獲取鎖,以下兩種情況可以通過判斷去進(jìn)行嘗試獲取鎖
//情況一:鎖不存在 + 隊(duì)列也不存在;KEYS[1]是鎖的名字;KEYS[2]是對(duì)線程排隊(duì)的隊(duì)列;
//情況二:鎖不存在 + 隊(duì)列存在 + 隊(duì)列的第一個(gè)元素就是當(dāng)前線程;ARGV[2]是當(dāng)前線程的UUID+ThreadID;
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) " +
"or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
//步驟三:當(dāng)前線程執(zhí)行獲取鎖的操作
//remove this thread from the queue and timeout set
//彈出隊(duì)列的第一個(gè)元素 + 從有序集合中刪除UUID:ThreadID對(duì)應(yīng)的元素
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" + //decrease timeouts for all waiting in the queue
//遞減有序集合中每個(gè)線程的分?jǐn)?shù),也就是遞減每個(gè)線程獲取鎖時(shí)的已經(jīng)等待時(shí)間
//zrange返回有序集合KEYS[3]中指定區(qū)間內(nèi)(0,-1)的成員,也就是全部成員
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
//對(duì)有序集合KEYS[3]的成員keys[i]的score減去:tonumber(ARGV[3])
//ARGV[3]就是線程獲取鎖時(shí)可以等待的時(shí)間,默認(rèn)是5分鐘
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" + //acquire the lock and set the TTL for the lease
//hset設(shè)置Hash值進(jìn)行加鎖操作 + pexpire設(shè)置鎖key的過期時(shí)間 + 最后返回nil表示加鎖成功
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;"

首先,執(zhí)行命令"lpop redisson_lock_queue:{myLock}",將隊(duì)列中的第一個(gè)元素彈出來。

然后,執(zhí)行命令"zrem redisson_lock_timeout:{myLock} UUID3:ThreadID3",將有序集合中客戶端C的線程對(duì)應(yīng)的元素給刪除掉。

接著,執(zhí)行"hset myLock UUID3:ThreadID3 1"進(jìn)行加鎖,設(shè)置field為UUID + 線程ID的value值為1。

最后,執(zhí)行命令"pexpire myLock 30000",設(shè)置key為鎖名的Hash值的過期時(shí)間為30000毫秒。

客戶端C完成加鎖后,客戶端C就會(huì)從隊(duì)列中出隊(duì),此時(shí)排在隊(duì)頭的就是客戶端B。

總結(jié)

以上是生活随笔為你收集整理的分布式锁—3.Redisson的公平锁的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

国产精品欧美一区二区三区不卡 | 99精品免费久久久久久久久日本 | 国产高清一 | 99精品国产在热久久下载 | 999在线精品| 在线观看免费色 | 亚洲欧美日韩精品久久久 | 亚洲网久久 | a视频免费看 | 91看片在线观看 | 一级黄色片在线免费观看 | 国产麻豆电影 | www色av| 日韩免费不卡视频 | 91成人国产 | 一区二区三区日韩精品 | 在线国产黄色 | 91成人精品一区在线播放 | 亚洲天堂网在线视频观看 | 视频福利在线 | 久久毛片网站 | 欧美aaa一级 | 中文字幕欧美日韩va免费视频 | 国产成人一区二区三区久久精品 | 六月丁香久久 | 黄色网在线播放 | 久久精品欧美日韩精品 | 一二三区在线 | 97成人在线| 99免费在线视频观看 | 色婷婷综合久久久中文字幕 | 97精品国产97久久久久久粉红 | 在线不卡中文字幕播放 | 日本高清久久久 | 久久婷婷一区二区三区 | 国产精品免费观看视频 | 国产老熟 | 久久看视频 | 欧美在线观看视频 | 日本爱爱片 | 天天射网| 国产精品久久一区二区无卡 | 精品国产三级a∨在线欧美 免费一级片在线观看 | 成人激情开心网 | 在线有码中文字幕 | 99热手机在线| 国产精品区一区 | 丁香婷婷射 | a视频在线观看 | 开心综合网 | 一区二区三区手机在线观看 | 欧美日韩精品二区第二页 | 日日干日日 | 伊人伊成久久人综合网站 | 国产成人香蕉 | 亚洲精品人人 | 超碰公开在线观看 | 99高清视频有精品视频 | 麻豆视频一区 | 超碰在线资源 | 国产精品ssss在线亚洲 | 日本资源中文字幕在线 | 国产专区日韩专区 | 国内精品久久久久久久久久久 | 99热高清 | 一区二区三区四区久久 | www激情网| 99精品热视频只有精品10 | 久久国产高清 | 91久色蝌蚪| 亚洲h在线播放在线观看h | 欧美日韩二三区 | 人人盈棋牌 | 黄色一级大片在线免费看国产一 | 久久免费久久 | 香蕉视频在线播放 | 国产精品久久久久久久久久久不卡 | 爱色av.com | 涩五月婷婷 | 日韩av在线看 | 久精品视频在线观看 | 69av在线播放 | 在线免费观看av网站 | 亚洲精品久久久久久久不卡四虎 | 天天干视频在线 | 日韩av一区二区三区在线观看 | www..com毛片 | 亚洲精品国偷拍自产在线观看蜜桃 | 日韩欧美在线国产 | 国产高清久久久久 | 欧美一区二区免费在线观看 | 天天干com | 久久天天躁夜夜躁狠狠85麻豆 | 日韩网页 | 一区 二区 精品 | 97av在线视频 | 天天射天天 | 欧美aaa级片 | 91热在线| 免费观看的av网站 | 欧美va天堂va视频va在线 | japanesexxxhd奶水| 97av影院| 美女av免费| 久久综合狠狠综合 | 亚洲欧美国产精品 | 综合激情| 韩国精品福利一区二区三区 | 久久九九影视网 | 亚洲电影久久 | 成人欧美一区二区三区在线观看 | 99视频在线观看视频 | 在线三级av | 日韩av女优视频 | 亚洲精品国产精品国产 | 国产精品区在线观看 | 天天色 天天 | a午夜在线 | 婷婷新五月 | 亚洲精品资源 | 国产精品成人免费 | 成年人电影毛片 | 午夜在线免费视频 | 欧美精品乱码久久久久 | 日韩精品一区二区在线视频 | www国产在线| 精品一区二区在线观看 | 字幕网在线观看 | 中文国产字幕在线观看 | 欧美性生活大片 | 日韩影视精品 | 天天爽天天做 | www.天天射.com | 99热国产在线 | 久久永久视频 | 日一日操一操 | 91免费版在线 | 高清久久久久久 | 97超碰人人爱 | 不卡av免费在线观看 | 福利视频第一页 | 精品电影一区二区 | 91九色成人蝌蚪首页 | 一区二区三区日韩在线 | 国产xxxx做受性欧美88 | 久久精品xxx | 最新久久久 | 日韩在线观看第一页 | 国产亚洲精品成人av久久ww | 欧美日韩在线看 | 韩国av免费观看 | 久久这里只有精品首页 | 久久伊人精品天天 | 国产中文在线播放 | 国产高潮久久 | 日韩亚洲国产精品 | 麻豆精品视频在线观看免费 | 亚洲色图激情文学 | 国产精品1区2区3区 久久免费视频7 | 亚洲国产免费网站 | 91日韩在线视频 | 91最新在线观看 | 国产成人精品午夜在线播放 | 久久av一区二区三区亚洲 | 98福利在线 | 狠狠操夜夜操 | a极黄色片| 高潮久久久久久久久 | av线上看| 丁香五婷 | 日韩欧美xxx| 久久精品a | 亚洲精品视频久久 | av成人免费在线看 | 9999精品| 色综合夜色一区 | 国产黄色精品在线观看 | 最近高清中文字幕在线国语5 | 亚洲做受高潮欧美裸体 | www天天干 | 91av福利视频 | 久久五月婷婷丁香社区 | 在线日韩中文字幕 | 91中文在线 | 国产精品精品国产 | 欧美日韩视频在线播放 | 91精品国产自产在线观看 | 黄色av电影免费观看 | 日本久久久久久科技有限公司 | 日韩在线精品视频 | 国内精品久久久久久久97牛牛 | 日日干日日操 | 99自拍视频在线观看 | 成年人三级网站 | 日韩手机在线观看 | 国产精品一区二 | 懂色av懂色av粉嫩av分享吧 | 亚洲成人家庭影院 | 天天综合五月天 | 91在线免费播放视频 | 91污污| aaa日本高清在线播放免费观看 | 97超碰总站| 国产亚洲婷婷 | 四虎视频 | 丝袜美腿av | 97人人添人澡人人爽超碰动图 | 国产成人久久精品 | 国产二区免费视频 | 久久精品99久久 | 丁香九月婷婷 | 国产日产精品一区二区三区四区的观看方式 | 天天五月天色 | 中文字幕在线看片 | 欧美精品免费在线 | 亚洲五月| 91麻豆精品国产91久久久久久久久 | 狠狠干夜夜 | 日韩精品视频久久 | 91| 亚洲视频在线看 | 国产日产在线观看 | 九九热视频在线播放 | 国产日韩精品一区二区三区 | 久久最新 | 精品久久久久国产 | 天天干夜夜爽 | 日韩免费 | 狠狠的日| 热久久免费国产视频 | 伊人欧美| 91超国产| 免费日韩一区二区三区 | 欧美成人亚洲成人 | 狠狠干网站 | 天堂av在线网站 | 日韩天天综合 | 蜜臀av夜夜澡人人爽人人 | 五月激情天 | 日韩欧美视频免费观看 | 久久国产免费视频 | 欧美日韩在线网站 | 欧美精品久久天天躁 | 九九视频在线播放 | 九九久久在线看 | 久久久久久久久毛片 | 麻豆视频国产在线观看 | 亚洲精品9| 久久久久亚洲国产精品 | www.天天操| 国产精品欧美激情在线观看 | 能在线观看的日韩av | www.狠狠干 | 日本精品一区二区在线观看 | 久久国产精品偷 | 91理论电影 | 欧美日韩在线播放一区 | 精品国产一区二区三区蜜臀 | 午夜精品一区二区三区四区 | 91网址在线| 国产美女无遮挡永久免费 | 亚洲精品乱码久久久久久蜜桃不爽 | 麻豆 91 在线 | 久久久免费精品视频 | 欧美极品一区二区三区 | 久久只精品99品免费久23小说 | 午夜黄网| 成年人在线播放视频 | 日韩av中文字幕在线免费观看 | 精品久久久久久亚洲综合网站 | 96久久欧美麻豆网站 | 国产精品一区二区三区四区在线观看 | 国产免费二区 | 天天射综合网站 | 久久久久久久久久久久久影院 | 精品久久久久久国产91 | 久久精品视频免费 | 天天综合入口 | 激情电影影院 | av超碰在线| 91精品国自产在线观看 | 日韩激情综合 | 四虎海外影库www4hu | 精品专区 | 亚洲激情视频 | 狠狠操狠狠插 | 婷婷五天天在线视频 | 看黄色.com | 日韩电影精品 | 九九热国产视频 | 中文字幕在线播放一区二区 | 国产区欧美 | 国产一级片免费视频 | 伊人五月 | 日韩色高清 | 亚洲国产成人在线观看 | 在线观看视频99 | 精品一二 | 操碰av| 在线看av的网址 | 日本女人逼 | 国产精品一区二区三区四 | 日本中文一级片 | 免费在线观看国产精品 | 亚洲精品视频免费看 | 久久观看免费视频 | 美女黄色网在线播放 | 久久久国产精品一区二区三区 | 麻豆久久久 | 欧美日韩色婷婷 | 成年一级片 | 成 人 黄 色视频免费播放 | 欧美日韩一区久久 | 96精品在线 | 99免费在线观看视频 | 国产精品久久久久久久久久妇女 | 黄色日本片 | 日本爱爱免费视频 | 深夜免费福利网站 | 亚洲欧洲一级 | 9797在线看片亚洲精品 | 在线a亚洲视频播放在线观看 | 激情综合站 | 国产污视频在线观看 | 91亚洲精品视频 | 精品国产诱惑 | 国产在线a| 在线91观看 | 天天视频色版 | 97人人模人人爽人人喊中文字 | 欧美日韩在线免费视频 | 国产精品久久av | 在线看片中文字幕 | 97热久久免费频精品99 | 日韩av黄| 成人午夜电影免费在线观看 | 2021国产在线视频 | 中文字幕在线视频一区 | 国产黄色精品 | 国精产品满18岁在线 | 99精品系列 | 中文字幕精品一区二区三区电影 | 成人黄色在线视频 | 国产中文字幕视频 | 亚洲另类视频在线观看 | 日本不卡视频 | 91久久一区二区 | 91传媒在线观看 | 婷婷天天色 | 亚洲区视频在线观看 | 深爱综合网 | 免费人做人爱www的视 | 麻豆国产网站 | 日韩精品一区二区三区第95 | 最新av在线免费观看 | 99精品免费久久久久久久久 | 91麻豆精品久久久久久 | 国产美女黄网站免费 | 国产精品色 | 成年人黄色免费网站 | 国产日韩欧美自拍 | 色狠狠久久av五月综合 | 亚洲欧洲日韩在线观看 | 男女啪啪免费网站 | 国产精品视频免费在线观看 | 天天操天天操天天操天天 | 天天射天天干天天插 | 干天天 | 日本韩国在线不卡 | 久久国产一区二区 | 91精品国产高清 | 在线亚州 | 一级欧美一级日韩 | 在线看黄网站 | 精壮的侍卫呻吟h | 一区二区三区高清 | www.五月婷婷.com | 99激情网 | 99久国产| 欧美精品久久久久久久久老牛影院 | 国产精品一区二区三区在线看 | 久久伦理电影网 | 国产黄a三级三级三级三级三级 | 亚洲国产操| 免费成人av| 日韩在线视频看看 | 色香com. | 蜜桃av人人夜夜澡人人爽 | 欧美va天堂va视频va在线 | 四虎国产精品免费 | 成人一区二区三区在线 | 欧美最新另类人妖 | 亚洲视频免费视频 | 国产精品视频免费在线观看 | 国产精品亚洲片夜色在线 | avwww在线观看 | 国产不卡网站 | 国产精品久久久久久av | 尤物一区二区三区 | 久久国产精品99久久久久久进口 | 在线观看中文字幕一区二区 | 婷婷国产v亚洲v欧美久久 | 日本激情动作片免费看 | 91一区在线观看 | 99爱爱| 在线观看亚洲国产精品 | 天天天操天天天干 | 日韩高清不卡一区二区三区 | 91在线中字 | 欧美最爽乱淫视频播放 | 国产激情电影综合在线看 | 欧美激情精品久久久 | 亚洲视频资源在线 | 最新91在线视频 | av免费福利 | 亚洲精品小视频在线观看 | 日韩成人免费在线 | 99国产在线 | 久久艹在线 | av天天草| 日韩欧美精品一区二区三区经典 | 免费在线观看av电影 | 久草在线视频新 | 国产视频二区三区 | 中文字幕永久在线 | 中文字幕视频网站 | 四虎4hu永久免费 | 麻豆va一区二区三区久久浪 | 综合色中文 | 国产超碰97| 一区二区三区视频 | 久久社区视频 | 一区二区三区日韩在线观看 | 99久久久国产精品 | 久久国产精品视频免费看 | 999精品网 | 成人黄色在线 | 久久精品视频观看 | 91九色网址| 91麻豆文化传媒在线观看 | 色综合在 | 狠狠干在线播放 | 777奇米四色 | 在线观看中文字幕dvd播放 | 国产激情久久久 | 在线小视频你懂得 | 婷婷丁香七月 | 亚洲色图色 | 涩涩色亚洲一区 | 又爽又黄又无遮挡网站动态图 | 91av手机在线 | 成人日韩av | 日韩av一区二区三区 | 黄色免费观看 | 日日夜夜精品免费观看 | 久久精品波多野结衣 | 久久首页 | 免费一级日韩欧美性大片 | 92av视频 | 天天插综合网 | 欧美另类交在线观看 | 美女网站色在线观看 | a级国产乱理论片在线观看 特级毛片在线观看 | 四虎免费av | 亚洲精品在线电影 | 黄色一级动作片 | 精品视频在线观看 | 久久久久久毛片精品免费不卡 | 免费色视频网站 | 五月综合 | 综合天天色 | 天天干.com | 久久精品超碰 | 99精品视频在线观看免费 | 亚洲精品国产品国语在线 | 特级黄色视频毛片 | 中文字幕亚洲欧美 | 99在线播放 | 婷婷免费视频 | 免费看的毛片 | 天天搞夜夜骑 | 91麻豆精品一区二区三区 | 九九99视频 | 91桃色在线免费观看 | 91视频免费国产 | 国产精品999久久久 久产久精国产品 | 国产视频日本 | 观看免费av | 久久精品久久99 | 亚洲精品777 | 久久国产亚洲精品 | 麻豆国产视频 | 国产一级久久 | www.久艹| 久久久国产一区二区三区四区小说 | 欧美精品在线一区 | 免费在线视频一区二区 | 九九九视频精品 | 亚洲精品视频在线观看网站 | 亚洲国产日韩在线 | 天天躁天天躁天天躁婷 | 69视频永久免费观看 | 国产精品入口麻豆www | 99视频在线观看一区三区 | 一区二区三区在线视频观看58 | 高潮久久久久久久久 | 国产精品综合久久 | 在线视频欧美精品 | 91免费观看视频在线 | 欧美性生活久久 | 精品久久久久久久久久久久久 | 99国产一区二区三精品乱码 | 日日干 天天干 | 成人资源在线 | 亚洲电影久久久 | 亚洲成人免费在线观看 | 国产亚洲永久域名 | 久久国产日韩 | 99精品欧美一区二区 | 91精品国产92久久久久 | 国产精品成人品 | 国产精品理论片在线播放 | 久久成人资源 | 亚洲国产成人精品在线观看 | 亚洲人人av | 2022中文字幕在线观看 | 黄色成人av网址 | 草久久久 | 在线国产激情视频 | 在线你懂 | 在线a视频免费观看 | 国产免费一区二区三区网站免费 | 中国一级片在线 | 亚洲国产精品资源 | 91av在| 久久免费在线观看 | 天天干com | 中文字幕色站 | av中文字幕在线观看网站 | 天天干中文字幕 | 91在线视频观看 | 免费看一级黄色大全 | 一级片色播影院 | 日韩a在线 | 99视频在线免费看 | 久久av影院| 四虎精品成人免费网站 | 成人福利在线 | 在线岛国av | 亚洲精品国产综合久久 | 在线观看免费黄视频 | 在线观看的黄色 | 国产高清不卡一区二区三区 | 久久久精品小视频 | 黄网av在线 | 久草视频在线资源站 | 不卡精品视频 | 我要看黄色一级片 | 视频在线亚洲 | 久久国产福利 | 正在播放国产精品 | 天天干天天操天天射 | 超碰人人干人人 | 狠狠狠狠狠狠狠干 | 久久国产热视频 | 国产成人精品不卡 | 九九热国产视频 | 亚洲日韩欧美一区二区在线 | 欧美日韩精品影院 | 国产精品免费一区二区三区在线观看 | 色婷婷视频在线 | 日韩xxxx视频| 91传媒91久久久 | 亚洲高清在线观看视频 | 国产成人久久精品亚洲 | 天天在线视频色 | av福利在线导航 | 日日夜夜噜噜噜 | 久色婷婷 | 国产亚洲成av片在线观看 | 91黄色在线观看 | 久久综合五月天婷婷伊人 | 免费黄色在线网站 | 91麻豆网| 国产视频在线一区二区 | 久久久蜜桃一区二区 | 亚洲国产中文在线观看 | 91中文字幕在线 | 日韩成人高清在线 | 精品日韩中文字幕 | 欧美久久99 | 97狠狠干| 人成午夜视频 | 亚洲va欧美va人人爽春色影视 | 尤物97国产精品久久精品国产 | 91九色视频在线观看 | 曰本三级在线 | 国产视频精品久久 | 日韩在线国产精品 | 伊人黄色网| 2021国产在线 | 最新中文字幕 | 福利在线看片 | 一二三精品视频 | 在线a人片免费观看视频 | 丁香六月婷婷综合 | 九九九热精品免费视频观看网站 | 国产欧美综合在线观看 | 香蕉在线影院 | 成全在线视频免费观看 | 日韩高清精品一区二区 | 色综合久久天天 | 国产午夜三级一区二区三 | 日本成人免费在线观看 | 日韩av在线一区二区 | 日韩精品一区二区三区在线播放 | 五月婷婷一区二区三区 | 亚洲天堂在线观看完整版 | 国产精品观看视频 | 久久国产精品99久久久久久丝袜 | 四虎成人精品永久免费av | 国产成在线观看免费视频 | 成人av网址大全 | 亚洲精品久久久久999中文字幕 | 亚洲综合欧美激情 | 色91av| 亚洲国产福利视频 | 91国内在线| 亚洲专区欧美专区 | 日韩视频一区二区三区 | 日韩精品视频在线观看网址 | 香蕉视频4aa | 欧美日韩亚洲精品在线 | 久久久黄视频 | 久久久久久久久艹 | 蜜桃麻豆www久久囤产精品 | 日韩免费二区 | 中文字幕在线观看免费观看 | 黄色毛片在线观看 | 国产精品久久久久四虎 | 五月天六月婷 | 天天爱天天舔 | 久久综合中文字幕 | 97在线精品国自产拍中文 | 亚洲国产精品传媒在线观看 | 国产精品视频地址 | 国产高清av在线播放 | 人人干97 | 国产又粗又长的视频 | 又黄又刺激视频 | 国产精品久久一 | 欧美精品久久久久久久久免 | 国产 在线 日韩 | 国产97色在线 | 色视频网站免费观看 | 亚洲精品高清一区二区三区四区 | 国产高清中文字幕 | 亚洲一区二区观看 | 久久a级片 | 国产日韩欧美自拍 | 国产又黄又爽无遮挡 | 国产亚洲精品日韩在线tv黄 | 99re视频在线观看 | 网站你懂的 | 久久亚洲欧美日韩精品专区 | 黄色成年网站 | 精品视频免费在线 | 日韩在线在线 | 免费看日韩 | 午夜国产成人 | 特黄免费av | 久久97精品 | 爱爱av网 | 久久久国产精华液 | 亚洲 欧洲av | av看片网 | 欧美一区二区三区在线视频观看 | 国产精品你懂的在线观看 | 中文在线免费看视频 | 亚洲综合五月 | 国产资源在线播放 | 日日操天天射 | 日韩素人在线观看 | 国产a高清 | 亚洲 欧洲 国产 精品 | 免费男女羞羞的视频网站中文字幕 | 国产只有精品 | 四虎影视久久久 | 亚洲狠狠干 | 国产三级av在线 | 成人h电影| 色七七亚洲影院 | 久草在线观看资源 | 六月激情 | 久久高清视频免费 | 午夜精品电影 | 99精品免费久久久久久日本 | 亚洲精品电影在线 | 国产精彩在线视频 | 91在线欧美 | 精品久久久久一区二区国产 | 午夜精品久久一牛影视 | 欧美最猛性xxxxx免费 | 久久女教师 | 国产亚洲日 | 超碰97国产在线 | 色婷婷av一区二 | www亚洲视频 | 999一区二区三区 | 亚洲国产中文字幕在线视频综合 | 精品国产视频在线 | 中文字幕视频在线播放 | 国产高清免费在线观看 | 精品视频一区在线 | 美女黄色网在线播放 | 日韩影视在线 | 国产一区二区在线免费观看 | 99色国产 | 中文字幕成人在线 | 久草在线欧美 | 97人人超碰在线 | 中文字幕乱码在线播放 | 午夜av影院 | 91看片淫黄大片一级在线观看 | 亚洲免费成人av电影 | 日韩精品一区二区三区不卡 | 国产又粗又长的视频 | 在线观av | 91福利专区 | 久久久蜜桃一区二区 | 五月在线 | 天天躁日日躁狠狠躁 | 一区二区欧美日韩 | 中文字幕在线观看完整版 | 欧美日韩性视频在线 | www成人精品| 狠狠狠色丁香综合久久天下网 | 久久久首页 | 黄色免费观看视频 | 国产精品久久久久久久久久久久午夜 | 色黄www小说 | 天天色.com| 91在线视频精品 | 天堂av在线免费观看 | 久草网视频在线观看 | 久久久91精品国产一区二区精品 | 少妇精品久久久一区二区免费 | 国产精品久久久久久久午夜片 | 国产精品视频线看 | 成人小视频在线 | av片一区 | 狠狠操91 | 欧美色图另类 | 97久久精品午夜一区二区 | 中文字幕123区 | 日韩成人精品 | 黄色特一级片 | 欧美天天射 | 91精品国产欧美一区二区成人 | 最近高清中文字幕在线国语5 | 丁香av | 亚洲男人天堂2018 | 91高清视频| 夜夜躁日日躁狠狠久久88av | 91污视频在线观看 | 日韩中文字幕a | 久草视频在线免费 | 91爱爱免费观看 | 久久香蕉国产 | 国产精品免费一区二区三区 | 国产麻豆视频在线观看 | 四虎影视国产精品免费久久 | 久久99国产精品免费网站 | 中文字幕无吗 | 欧美做受高潮电影o | 一级成人免费视频 | 黄色大片国产 | 国产精品三级视频 | 国产高清亚洲 | 91在线看视频| 免费日韩电影 | 五月婷婷操 | 国产在线探花 | 男女免费视频观看 | 国产一区二区三区高清播放 | 免费看一及片 | 91免费在线 | 亚洲视频精品在线 | 国产精品久久久久久久久久久久 | 精品毛片在线 | 高清在线一区二区 | 欧美日韩三级在线观看 | 日韩免费在线观看 | 81精品国产乱码久久久久久 | 激情丁香婷婷 | 欧美日韩高清免费 | 亚洲欧美国产日韩在线观看 | 成人久久国产 | 四虎国产精品永久在线国在线 | 久久久国产精品免费 | 97人人澡人人爽人人模亚洲 | 亚洲在线看| 国产一区黄色 | 人人草在线视频 | 日韩有码在线播放 | 波多野结衣在线观看一区 | 一区在线电影 | 成人资源站 | www.五月婷婷 | 成人av电影在线播放 | 亚洲伊人av| 日本视频久久久 | 成人一级电影在线观看 | 免费av看片 | 91福利试看 | 在线色亚洲 | 欧美一区二区三区四区夜夜大片 | 丁香高清视频在线看看 | a在线播放 | 成人黄色小说网 | 国产在线精品观看 | 日韩r级在线 | 美女国产 | 久草在线观看视频免费 | 黄色毛片在线 | 亚洲激情一区二区三区 | 久久97久久97精品免视看 | 欧美另类xxx| 久久爱导航 | 色偷偷中文字幕 | a黄色大片 | 欧美高清视频不卡网 | 日韩av中文在线观看 | 亚洲精品在线免费 | 五月香视频在线观看 | 这里有精品在线视频 | 六月激情婷婷 | 欧美精品一二三 | 成人黄色视| 欧美日本高清视频 | 日韩成人精品一区二区三区 | 日韩中文幕 | 亚洲精品乱码久久久久久9色 | 香蕉97视频观看在线观看 | 亚洲精品激情 | 热久久国产精品 | 国产色爽 | 午夜电影av | 久久久精品国产免费观看一区二区 | 国产精品日韩在线观看 | 99精品视频中文字幕 | 一区二区不卡 | japanesexxxhd奶水 国产一区二区在线免费观看 | 国产亚洲精品无 | 国产精品毛片久久 | 中文av资源站 | 狠狠操精品| 精品久久一区 | 欧美aaa大片 | 欧美 日韩 成人 | 日韩精品久久久久久 | 五月天婷婷在线视频 | 91看片淫黄大片一级在线观看 | 在线观看免费视频你懂的 | 国产香蕉av | bbbbb女女女女女bbbbb国产 | 国产综合精品久久 | 久久精品视频免费播放 | 欧美成人视 | 亚洲日本va午夜在线电影 | 99免费看片| 国产视频在线观看一区 | 亚洲四虎在线 | 欧美性生活久久 | 色天天综合久久久久综合片 | 天天色天天骑天天射 | 日韩av手机在线观看 | 国产精品一区二区在线观看免费 | 日本中文字幕在线视频 | 天天色天 | av中文字幕网址 | 亚洲三级黄色 | 国产91精品在线播放 | 黄色网www| 亚洲一级电影视频 | 色之综合网| 五月婷婷色综合 | 亚洲男男gaygay无套 | 国产成人一区二区三区电影 | 亚洲在线激情 | 日韩精品一区二区三区第95 | 欧美成年性 | 808电影 | 亚洲色影爱久久精品 | 亚洲人av免费网站 | 青草视频在线看 | 久久久久9999亚洲精品 | 很黄很色很污的网站 | 成人午夜精品福利免费 | 亚洲国产精品久久久 | 精品三级av | 日本在线观看一区 | 欧美一区二区三区在线播放 | 91在线日韩 | 国产精品精品久久久久久 | 免费国产亚洲视频 | 国产精品18毛片一区二区 | 天天干天天干天天射 | 麻豆果冻剧传媒在线播放 | 精品美女在线视频 | 亚洲欧洲精品视频 | av电影免费在线播放 | 日韩a在线| 久久噜噜少妇网站 | 九九免费在线观看视频 | 日韩精品中文字幕久久臀 | 99免费精品视频 | 亚洲精品成人网 | 国产精品爽爽爽 | 蜜臀久久99静品久久久久久 | 91av视频免费在线观看 | 手机av电影在线 | 日韩av免费观看网站 | 国产黄色成人av | 亚洲欧美婷婷六月色综合 | 国产精品99久久久久久宅男 | 日韩欧美在线观看一区二区 | 久久天| 91亚洲欧美激情 | 日本在线免费看 | 国产最新精品视频 | 日本在线观看视频一区 | 国产精品一区二区三区四区在线观看 | 国产va精品免费观看 | 最新av在线网站 | 亚洲高清av | avwww在线| 午夜性色| 丁香九月婷婷 | 精品欧美小视频在线观看 | 午夜.dj高清免费观看视频 | av电影在线播放 | 欧美少妇影院 | 亚洲高清精品在线 | 成人免费看片98欧美 | 99在线精品免费视频九九视 | 日韩精品视频在线免费观看 | 中文字幕日韩无 | 日韩视频免费播放 | 成人av手机在线 | 久久草av | 99精品国产一区二区三区不卡 | 成年人黄色大片在线 | 在线免费色视频 | 在线一区观看 | 国产亚洲综合精品 | 久久精品系列 | 在线日韩三级 | 国产精品免费在线播放 | 日韩xxxbbb | 欧洲一区精品 | 欧美大片在线观看一区 | 中文字幕第一页在线播放 | 国产又粗又猛又色又黄视频 | 欧美日韩一区二区三区不卡 | 一本色道久久精品 | 午夜黄网| 国产精品18久久久久久首页狼 | 亚洲综合色婷婷 | 国产麻豆果冻传媒在线观看 | 色五丁香| 视频91在线 | 午夜精品一二区 | 日本xxxx裸体xxxx17 | 亚洲综合欧美日韩狠狠色 | 免费看毛片在线 | 天天操天天添天天吹 | 国产福利资源 | 国产在线探花 | 天天综合网久久 | 国产精品一区二区久久精品爱涩 | 日本丶国产丶欧美色综合 | 中文字幕999 | 色干综合 | 欧美一区二区三区在线视频观看 | 美女视频国产 | 婷婷综合亚洲 | 久久亚洲热 | 久99久久| 国产在线观看xxx | 二区在线播放 | 五月激情丁香婷婷 | 欧美性黄网官网 | 日本中文不卡 | 亚洲另类视频 | 狠狠色婷婷丁香六月 | 深爱激情综合 | 日韩中文字幕免费电影 |