redis系列:分布式锁
1 介紹
這篇博文講介紹如何一步步構建一個基于Redis的分布式鎖。會從最原始的版本開始,然后根據問題進行調整,最后完成一個較為合理的分布式鎖。
本篇文章會將分布式鎖的實現分為兩部分,一個是單機環境,另一個是集群環境下的Redis鎖實現。在介紹分布式鎖的實現之前,先來了解下分布式鎖的一些信息。
2 分布式鎖
2.1 什么是分布式鎖?
分布式鎖是控制分布式系統或不同系統之間共同訪問共享資源的一種鎖實現,如果不同的系統或同一個系統的不同主機之間共享了某個資源時,往往需要互斥來防止彼此干擾來保證一致性。
2.2 分布式鎖需要具備哪些條件
2.4 分布式鎖的實現有哪些?
3 單機Redis的分布式鎖
3.1 準備工作
3.1.1 定義常量類
public class LockConstants {public static final String OK = "OK";/** NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key if it already exist. **/public static final String NOT_EXIST = "NX";public static final String EXIST = "XX";/** expx EX|PX, expire time units: EX = seconds; PX = milliseconds **/public static final String SECONDS = "EX";public static final String MILLISECONDS = "PX";private LockConstants() {} } 復制代碼3.1.2 定義鎖的抽象類
抽象類RedisLock實現java.util.concurrent包下的Lock接口,然后對一些方法提供默認實現,子類只需實現lock方法和unlock方法即可。代碼如下
public abstract class RedisLock implements Lock {protected Jedis jedis;protected String lockKey;public RedisLock(Jedis jedis,String lockKey) {this(jedis, lockKey);}public void sleepBySencond(int sencond){try {Thread.sleep(sencond*1000);} catch (InterruptedException e) {e.printStackTrace();}}public void lockInterruptibly(){}public Condition newCondition() {return null;}public boolean tryLock() {return false;}public boolean tryLock(long time, TimeUnit unit){return false;}} 復制代碼3.2 最基礎的版本1
先來一個最基礎的版本,代碼如下
public class LockCase1 extends RedisLock {public LockCase1(Jedis jedis, String name) {super(jedis, name);}public void lock() {while(true){String result = jedis.set(lockKey, "value", NOT_EXIST);if(OK.equals(result)){System.out.println(Thread.currentThread().getId()+"加鎖成功!");break;}}}public void unlock() {jedis.del(lockKey);} } 復制代碼LockCase1類提供了lock和unlock方法。
其中lock方法也就是在reids客戶端執行如下命令
而unlock方法就是調用DEL命令將鍵刪除。
好了,方法介紹完了。現在來想想這其中會有什么問題?
假設有兩個客戶端A和B,A獲取到分布式的鎖。A執行了一會,突然A所在的服務器斷電了(或者其他什么的),也就是客戶端A掛了。這時出現一個問題,這個鎖一直存在,且不會被釋放,其他客戶端永遠獲取不到鎖。如下示意圖
可以通過設置過期時間來解決這個問題
3.3 版本2-設置鎖的過期時間
public void lock() {while(true){String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30);if(OK.equals(result)){System.out.println(Thread.currentThread().getId()+"加鎖成功!");break;}} } 復制代碼類似的Redis命令如下
SET lockKey value NX EX 30 復制代碼注:要保證設置過期時間和設置鎖具有原子性
這時又出現一個問題,問題出現的步驟如下
示意圖如下
這時會有兩個問題
先來解決如何保證鎖不會被誤刪除這個問題。
這個問題可以通過設置value為當前客戶端生成的一個隨機字符串,且保證在足夠長的一段時間內在所有客戶端的所有獲取鎖的請求中都是唯一的。
版本2的完整代碼:Github地址
3.4 版本3-設置鎖的value
抽象類RedisLock增加lockValue字段,lockValue字段的默認值為UUID隨機值假設當前線程ID。
public abstract class RedisLock implements Lock {//...protected String lockValue;public RedisLock(Jedis jedis,String lockKey) {this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId());}public RedisLock(Jedis jedis, String lockKey, String lockValue) {this.jedis = jedis;this.lockKey = lockKey;this.lockValue = lockValue;}//... } 復制代碼加鎖代碼
public void lock() {while(true){String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30);if(OK.equals(result)){System.out.println(Thread.currentThread().getId()+"加鎖成功!");break;}} } 復制代碼解鎖代碼
public void unlock() {String lockValue = jedis.get(lockKey);if (lockValue.equals(lockValue)){jedis.del(lockKey);} } 復制代碼這時看看加鎖代碼,好像沒有什么問題啊。
再來看看解鎖的代碼,這里的解鎖操作包含三步操作:獲取值、判斷和刪除鎖。這時你有沒有想到在多線程環境下的i++操作?
3.4.1 i++問題
i++操作也可分為三個步驟:讀i的值,進行i+1,設置i的值。
如果兩個線程同時對i進行i++操作,會出現如下情況
在多線程環境下有什么方式可以避免這類情況發生?
解決方式有很多種,例如用AtomicInteger、CAS、synchronized等等。
這些解決方式的目的都是要確保i++ 操作的原子性。那么回過頭來看看解鎖,同理我們也是要確保解鎖的原子性。我們可以利用Redis的lua腳本來實現解鎖操作的原子性。
版本3的完整代碼:Github地址
3.5 版本4-具有原子性的釋放鎖
lua腳本內容如下
if redis.call("get",KEYS[1]) == ARGV[1] thenreturn redis.call("del",KEYS[1]) elsereturn 0 end 復制代碼這段Lua腳本在執行的時候要把的lockValue作為ARGV[1]的值傳進去,把lockKey作為KEYS[1]的值傳進去。現在來看看解鎖的java代碼
public void unlock() {// 使用lua腳本進行原子刪除操作String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";jedis.eval(checkAndDelScript, 1, lockKey, lockValue); } 復制代碼好了,解鎖操作也確保了原子性了,那么是不是單機Redis環境的分布式鎖到此就完成了?
別忘了版本2-設置鎖的過期時間還有一個,過期時間如何保證大于業務執行時間問題沒有解決。
版本4的完整代碼:Github地址
3.6 版本5-確保過期時間大于業務執行時間
抽象類RedisLock增加一個boolean類型的屬性isOpenExpirationRenewal,用來標識是否開啟定時刷新過期時間。
在增加一個scheduleExpirationRenewal方法用于開啟刷新過期時間的線程。
加鎖代碼在獲取鎖成功后將isOpenExpirationRenewal置為true,并且調用scheduleExpirationRenewal方法,開啟刷新過期時間的線程。
public void lock() {while (true) {String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30);if (OK.equals(result)) {System.out.println("線程id:"+Thread.currentThread().getId() + "加鎖成功!時間:"+LocalTime.now());//開啟定時刷新過期時間isOpenExpirationRenewal = true;scheduleExpirationRenewal();break;}System.out.println("線程id:"+Thread.currentThread().getId() + "獲取鎖失敗,休眠10秒!時間:"+LocalTime.now());//休眠10秒sleepBySencond(10);} } 復制代碼解鎖代碼增加一行代碼,將isOpenExpirationRenewal屬性置為false,停止刷新過期時間的線程輪詢。
public void unlock() {//...isOpenExpirationRenewal = false; }復制代碼版本5的完整代碼:Github地址
3.7 測試
測試代碼如下
public void testLockCase5() {//定義線程池ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10,1, TimeUnit.SECONDS,new SynchronousQueue<>());//添加10個線程獲取鎖for (int i = 0; i < 10; i++) {pool.submit(() -> {try {Jedis jedis = new Jedis("localhost");LockCase5 lock = new LockCase5(jedis, lockName);lock.lock();//模擬業務執行15秒lock.sleepBySencond(15);lock.unlock();} catch (Exception e){e.printStackTrace();}});}//當線程池中的線程數為0時,退出while (pool.getPoolSize() != 0) {} } 復制代碼測試結果
或許到這里基于單機Redis環境的分布式就介紹完了。但是使用java的同學有沒有發現一個鎖的重要特性
那就是鎖的重入,那么分布式鎖的重入該如何實現呢?這里就留一個坑了
4 集群Redis的分布式鎖
在Redis的分布式環境中,Redis 的作者提供了RedLock 的算法來實現一個分布式鎖。
4.1 加鎖
RedLock算法加鎖步驟如下
4.2 解鎖
向所有的Redis實例發送釋放鎖命令即可,不用關心之前有沒有從Redis實例成功獲取到鎖.
關于RedLock算法,還有一個小插曲,就是Martin Kleppmann 和?RedLock 作者 antirez的對RedLock算法的互懟。 官網原話如下
Martin Kleppmann analyzed Redlock here. I disagree with the analysis and posted my reply to his analysis here.
更多關于RedLock算法這里就不在說明,有興趣的可以到官網閱讀相關文章。
5 總結
這篇文章講述了一個基于Redis的分布式鎖的編寫過程及解決問題的思路,但是本篇文章實現的分布式鎖并不適合用于生產環境。java環境有 Redisson 可用于生產環境,但是分布式鎖還是Zookeeper會比較好一些(可以看Martin Kleppmann 和?RedLock的分析)。
Martin Kleppmann對RedLock的分析:martin.kleppmann.com/2016/02/08/…
RedLock 作者 antirez的回應:antirez.com/news/101
整個項目的地址存放在Github上,有需要的可以看看:Github地址
總結
以上是生活随笔為你收集整理的redis系列:分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到钱被抢走什么预兆
- 下一篇: node:爬虫爬取网页图片