前兩個版本的代碼 都或多或少存在一定的問題,雖然可能微乎其微,但是程序需要嚴謹再嚴謹,
第一個版本問題: 局限于單機版,依賴于 Jvm的鎖
第二個版本問題: 極端情況下,解鎖邏輯的問題,線程B的鎖,可能會被線程A解掉,這種情況實際上是不合理的。
1. 由于是客戶端自己生成過期時間,所以需要強制要求分布式下每個客戶端的時間必須同步。
2. 當鎖過期的時候,如果多個客戶端同時執行jedis.getSet()方法,那么雖然最終只有一個客戶端可以加鎖,
但是這個客戶端的鎖的過期時間可能被其他客戶端覆蓋。
3. 鎖不具備擁有者標識,即任何客戶端都可以解鎖。
版本一: http://www.cnblogs.com/xifenglou/p/8807323.html
版本二: http://www.cnblogs.com/xifenglou/p/8883717.html所以基于以上問題,第三個版本出來了,
Talk is cheap, show me the code!import org.springframework.util.StopWatch;
import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實現 分布式鎖* 終極版本*/
public class TicketRunnable3 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 10000;private static final String lockKey = "LockKey";private volatile static boolean working = true;public TicketRunnable3(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; // 總票數 此處可隨意 寫一個數,保證線程能運行起來,真正的共享變量不應該寫死在程序中, 應該從redis中獲取,這樣模擬多進程多線程的并發訪問public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();try{boolean getLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name,Lock_Timeout);if( getLock){if(!working)return;// Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket",num+"");if(num!=0)System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號" + (num+1)+",還剩" + num + "張票--" );else {System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號" + (num+1)+",票已經票完!--");working = false;}}}else{//System.out.println();if(!working)return;System.out.println(Thread.currentThread().getName()+" Try to get the Lock,and wait 20 millisecond....");Thread.sleep(10);}}catch(Exception e){System.out.println(e);}finally {try {if(RedisTool.releaseDistributedLock(jedis,lockKey,name)){Thread.sleep(30);}}catch (Exception e ) {e.printStackTrace();}}}@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+"到達,等待中...");Jedis jedis = new Jedis("localhost", 6379);try{barrier.await(); // 此處阻塞 等所有線程都到位后 一起進行搶票if(Thread.currentThread().getName().equals("pool-1-thread-1")){System.out.println("-----------------全部線程準備就緒,開始搶票------------------");}else {Thread.sleep(5);}while (working) {sellTicket(jedis);}count.countDown(); //當前線程結束后,計數器-1}catch (Exception e){e.printStackTrace();}}/**** @param args*/public static void main(String[] args) {int threadNum = 5; //模擬多個窗口 進行售票final CyclicBarrier barrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); // 用于統計 執行時長StopWatch watch = new StopWatch();watch.start();TicketRunnable3 tickets = new TicketRunnable3(count,barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < threadNum; i++) { //此處 設置數值 受限于 線程池中的數量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗 時:" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();}}
}
import redis.clients.jedis.Jedis;
import java.util.Collections;public class RedisTool {private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";private static final Long RELEASE_SUCCESS = 1L;/** * 嘗試獲取分布式鎖* @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @param expireTime 超期時間* @return 是否獲取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {System.out.println("=============="+Thread.currentThread().getName()+"=============== 獲取到鎖,開始工作!");return true;}return false;}/*** 釋放分布式鎖* @param jedis Redis客戶端* @param lockKey 鎖* @param requestId 請求標識* @return 是否釋放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {System.out.println("=============="+Thread.currentThread().getName()+"=============== 解鎖成功!");return true;}return false;}}
解鎖部分,我們將Lua代碼傳到jedis.eval()方法里,并使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那么這段Lua代碼的功能是什么呢?其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那么為什么要使用Lua語言來實現呢?因為要確保上述操作是原子性的。源于Redis的特性,下面是官網對eval命令的部分解釋:
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,并且直到eval命令執行完成,Redis才會執行其他命令。
?
?
運行結果如下:
?
?歡迎留言,期待更深層次的探討!
?
針對?上述代碼,使用兩個類?運行,
TicketRunnable3? TicketRunnable4?模擬多進程? 多線程場景 ,
場景1:?運行時長 >?過期時長? ??
此時:?鎖自動失效,?線程均不用解鎖,即使解鎖也是失敗!
代碼及運行結果如下:
import org.springframework.util.StopWatch;
import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實現 分布式鎖* 終極版本*/
public class TicketRunnable4 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 3000; // 過期時間 代表 3秒后過期private static final Integer ExecuteTime = 5000;private static final Integer RetryInterval = 20;private static final String lockKey = "LockKey";private volatile static boolean working = true;public TicketRunnable4(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; // 總票數public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();boolean gotLock = false;try{gotLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name,Lock_Timeout);if( gotLock && working){// Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket",num+"");if(num!=0)System.out.println("=============="+name+"=============== 售出票號" + (num+1)+",還剩" + num + "張票--" );else {System.out.println("=============="+name+"=============== 售出票號" + (num+1)+",票已經票完!--");return;}}if(num == 0){System.out.println("=============="+name+"============票已經被搶空啦");working = false;}Thread.sleep(ExecuteTime);}else{//System.out.println();//System.out.println(name+" Try to get the Lock,and wait "+RetryInterval+" millisecond....");Thread.sleep(RetryInterval);}}catch(Exception e){System.out.println(e);}finally {try {if(!gotLock||!working) //未獲取到鎖的線程不用解鎖return;/*** 解鎖成功后 sleep, 嘗試讓出cpu給其他線程機會* 解鎖失敗 說明鎖已經失效 被其他線程獲取到*/if(RedisTool.releaseDistributedLock(jedis,lockKey,name)){Thread.sleep(100);}}catch (Exception e ) {e.printStackTrace();}}}@Overridepublic void run() {String prefix = "#";String threadName = Thread.currentThread().getName();Thread.currentThread().setName(prefix+threadName);System.out.println(Thread.currentThread().getName()+"到達,等待中...");Jedis jedis = new Jedis("localhost", 6379);try{barrier.await(); // 此處阻塞 等所有線程都到位后 一起進行搶票if(Thread.currentThread().getName().equals(prefix+"pool-1-thread-2")){System.out.println("-----------------全部線程準備就緒,開始搶票------------------");}else {Thread.sleep(5);}while (working) {sellTicket(jedis);}count.countDown(); //當前線程結束后,計數器-1}catch (Exception e){e.printStackTrace();}}/**** @param args*/public static void main(String[] args) {int threadNum = 3; //模擬多個窗口 進行售票final CyclicBarrier barrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); // 用于統計 執行時長StopWatch watch = new StopWatch();watch.start();TicketRunnable4 tickets = new TicketRunnable4(count,barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < threadNum; i++) { //此處 設置數值 受限于 線程池中的數量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗 時:" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();}}
}
TicketRunnable3?售出 10 9 8 6 5 3 2 1?票號。
TicketRunnable4售出? 7 4? 兩個票號
合計10張票,模擬結束!
場景2:?運行時長 < 過期時長? ??
此時:?此時需要有鎖線程去釋放鎖,這樣多線程再去競爭獲取鎖。
?
修改代碼:
?
private static final Integer Lock_Timeout = 5000; // 將時間從3秒改為5秒
private static final Integer ExecuteTime = 3000; // 將執行時間5秒改為3秒運行結果如下:
?
一個進程售出 10 9 8 6 4 3 1 票號
另一進程售出 7 5 2 票號
此時 每個線程完成任務后,均需要釋放鎖,這樣本地線程或是異地線程 才能獲取到鎖,這樣才能有機會進行任務的執行!
總結
以上是生活随笔為你收集整理的基于redis分布式锁实现的多线程并发程序的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。