日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java redisson_Java使用Redisson分布式锁实现原理

發布時間:2024/6/1 java 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java redisson_Java使用Redisson分布式锁实现原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本篇文章摘自:https://www.jb51.net/article/149353.htm

由于時間有限,暫未驗證 僅先做記錄。有大家注意下哈(會盡快抽時間進行驗證)

1. 基本用法

添加依賴

org.redisson

redisson

3.8.2

Config config = newConfig();

config.useClusterServers()

.setScanInterval(2000) //cluster state scan interval in milliseconds

.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")

.addNodeAddress("redis://127.0.0.1:7002");

RedissonClient redisson=Redisson.create(config);

RLock lock= redisson.getLock("anyLock");

lock.lock();try{

...

}finally{

lock.unlock();

}

針對上面這段代碼,重點看一下Redisson是如何基于Redis實現分布式鎖的

Redisson中提供的加鎖的方法有很多,但大致類似,此處只看lock()方法

2. 加鎖

可以看到,調用getLock()方法后實際返回一個RedissonLock對象,在RedissonLock對象的lock()方法主要調用tryAcquire()方法

由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個方法才是關鍵

首先,看一下evalWriteAsync方法的定義

由于leaseTime == -1,于是走tryLockInnerAsync()方法,這個方法才是關鍵

首先,看一下evalWriteAsync方法的定義

最后兩個參數分別是keys和params

實際調用是這樣的:

單獨將調用的那一段摘出來看

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",

Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

結合上面的參數聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假設前面獲取鎖時傳的name是“abc”,假設調用的線程ID是Thread-1,假設成員變量UUID類型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c

那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,這段腳本的意思是

1、判斷有沒有一個叫“abc”的key

2、如果沒有,則在其下設置一個字段為“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值為“1”的鍵值對 ,并設置它的過期時間

3、如果存在,則進一步判斷“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,則其值加1,并重新設置過期時間

4、返回“abc”的生存時間(毫秒)

這里用的數據結構是hash,hash的結構是: key 字段1 值1 字段2 值2 。。。

用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當前獲得鎖的線程

所有競爭這把鎖的線程都要判斷在這個key下有沒有自己線程的字段,如果沒有則不能獲得鎖,如果有,則相當于重入,字段值加1(次數)

3. 解鎖

protected RFuture unlockInnerAsync(longthreadId) {returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",

Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

我們還是假設name=abc,假設線程ID是Thread-1

同理,我們可以知道

KEYS[1]是getName(),即KEYS[1]=abc

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存時間

ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1

因此,上面腳本的意思是:

1、判斷是否存在一個叫“abc”的key

2、如果不存在,向Channel中廣播一條消息,廣播的內容是0,并返回1

3、如果存在,進一步判斷字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在

4、若字段不存在,返回空,若字段存在,則字段值減1

5、若減完以后,字段值仍大于0,則返回0

6、減完后,若字段值小于或等于0,則廣播一條消息,廣播內容是0,并返回1;

可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的線程現在可以獲得鎖了

4. 等待

以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?

再回到前面獲取鎖的位置

@Overridepublic void lockInterruptibly(long leaseTime, TimeUnit unit) throwsInterruptedException {long threadId =Thread.currentThread().getId();

Long ttl=tryAcquire(leaseTime, unit, threadId);//lock acquired

if (ttl == null) {return;

}//訂閱

RFuture future =subscribe(threadId);

commandExecutor.syncSubscription(future);try{while (true) {

ttl=tryAcquire(leaseTime, unit, threadId);//lock acquired

if (ttl == null) {break;

}//waiting for message

if (ttl >= 0) {

getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

}else{

getEntry(threadId).getLatch().acquire();

}

}

}finally{

unsubscribe(future, threadId);

}//get(lockAsync(leaseTime, unit));

}protected static final LockPubSub PUBSUB = newLockPubSub();protected RFuture subscribe(longthreadId) {returnPUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());

}protected void unsubscribe(RFuture future, longthreadId) {

PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());

}

這里會訂閱Channel,當資源可用時可以及時知道,并搶占,防止無效的輪詢而浪費資源

當資源可用用的時候,循環去嘗試獲取鎖,由于多個線程同時去競爭資源,所以這里用了信號量,對于同一個資源只允許一個線程獲得鎖,其它的線程阻塞

5. 小結

6. 其它相關

@感謝原文作者的分享:https://www.jb51.net/article/149353.htm

總結

以上是生活随笔為你收集整理的java redisson_Java使用Redisson分布式锁实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。