基于redis分布式锁实现的多线程并发程序(原创)
第一個(gè)版本問(wèn)題:? 局限于單機(jī)版,依賴(lài)于 Jvm的鎖
第二個(gè)版本問(wèn)題:? 極端情況下,解鎖邏輯的問(wèn)題,線程B的鎖,可能會(huì)被線程A解掉,這種情況實(shí)際上是不合理的。
1.由于是客戶(hù)端自己生成過(guò)期時(shí)間,所以需要強(qiáng)制需要分布式下每個(gè)客戶(hù)端的時(shí)間必須同步。
2.當(dāng)鎖過(guò)期的時(shí)候,如果多個(gè)客戶(hù)端同時(shí)執(zhí)行jedis.getSet()方法,那么雖然最終只有一個(gè)客戶(hù)端可以加鎖,但是這個(gè)客戶(hù)端的
鎖的過(guò)期時(shí)間可能被其他客戶(hù)端覆蓋。
3.鎖不具備擁有者標(biāo)識(shí),即任何客戶(hù)端都可以解鎖。
版本一:?http://www.cnblogs.com/xifenglou/p/8807323.html
版本二:?http://www.cnblogs.com/xifenglou/p/8883717.html
所以基于以上問(wèn)題,第三個(gè)版本出來(lái)了,Talk is cheap, show me the code!
import org.springframework.util.StopWatch; import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實(shí)現(xiàn)分布式鎖* 終極版本*/ public class TicketRunnable3 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 10000;private static final String lockKey = "LockKey";private volatile static boolean working = true;public TicketRunnable3(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; //總票數(shù) 此處可隨意寫(xiě)一個(gè)數(shù),保證線程能運(yùn)行起來(lái),真正的共享變量不應(yīng)該寫(xiě)死在程序中,應(yīng)該從redis中獲取,這樣模擬多進(jìn)程多線程的并發(fā)訪問(wèn)public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();try {boolean getLock = RedisTool.tryGetDistributedLock(jedis,lockKey, name, Lock_Timeout);if (getLock) {if (!working) return;//Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket", num+"");if (num != 0)System.out.println("==============="+Thread.currentThread().getName()+"============= 售出票號(hào)" + (num+1)+", 還剩" + num + "張票--");else {System.out.println("================"+Thread.currentThread().getName()+"================= 售出票號(hào)" + (num+1)+",票已經(jīng)票完!--");working = false;}}} else {//System.out.println();if (!working) return;System.out.println(Thread.currentThread().getName()+" Try to get the Lock, and wait 20 millisecond...");Thread.sleep(10);}} catch (Exception e) {System.out.println(e);} finally {try {if (RedisTool.releaseDistributedLock(jedis, lockKey, name)) {Thread.sleep(30);}} catch(Exception e) {e.printStackTrace();}}} }@Override public void run() {System.out.println(Thread.currentThread().getName()+"到達(dá),等待中...");Jedis jedis = new Jedis("localhost", 6379);try {barrier.await(); //此處阻塞 等所有線程都到位后 一起進(jìn)行搶票if (Thread.currentThread().getName().equals("pool-1-thread-1")) {System.out.println("----------------全部線程準(zhǔn)備就緒,開(kāi)始搶票-----------");} else {Thread.sleep(5);}while (working) {sellTicket(jedis);}count.countDown(); //當(dāng)前線程結(jié)束后,計(jì)數(shù)器-1} catch (Exception e) {e.printStackTrace(); } }/*** * @param args*/ public static void main(String[] args) {int threadNum = 5; //模擬多個(gè)窗口 進(jìn)行售票final CyclicBarrier barrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); //用于統(tǒng)計(jì) 執(zhí)行時(shí)長(zhǎng)StopWatch watch = new StopWatch();watch.start();TicketRunnable3 tickets = new TicketRunnable3(count, barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i=0; i<threadNum; i++) { //此處 設(shè)置數(shù)值 受限于 線程池中的數(shù)量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗時(shí):" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();} } }import redis.clients.jedis.Jedis; import java.util.Collections;public class RedisTool private static final String LOCK_SUCCESS = "OK";private static final String SET_IF_NOT_EXIST = "NX";private static final String SET_WITH_EXPIRE_TIME = "PX";private static final Long RELEASE_SUCCESS = 1L;/*** 嘗試獲取分布式鎖* @param jedis Redis客戶(hù)端* @param lockKey 鎖* @param requestId 請(qǐng)求標(biāo)識(shí)* @param expireTime 超期時(shí)間* @return 是否獲取成功*/public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);if (LOCK_SUCCESS.equals(result)) {System.out.println("=============="+Thread.currentThread().getName()+"===========獲取到鎖,開(kāi)始工作!");return true;}return false;}/*** 釋放分布式鎖* @param jedis Redis客戶(hù)端* @param lockKey 鎖* @param requestId 請(qǐng)求標(biāo)識(shí)* @return 是否釋放成功*/public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS1[1]) else return 0 end";Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));if (RELEASE_SUCCESS.equals(result)) {System.out.println("==========="+Thread.currentThread().getName()+"=========== 解鎖成功!");return true;}return false;} }解鎖部分,我們將Lua代碼傳到j(luò)edis.eval()方法里,并使參數(shù)KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務(wù)端執(zhí)行。
那么這段Lua代碼的功能是什么呢?其實(shí)很簡(jiǎn)單,首先獲取對(duì)應(yīng)的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。那么為什么要使用Lua語(yǔ)言來(lái)實(shí)現(xiàn)呢?因?yàn)橐_保上述操作時(shí)原子性的。源于Redis的特性,下面是官網(wǎng)對(duì)eval命令的部分解釋:
簡(jiǎn)單來(lái)說(shuō),就是在eval命令執(zhí)行Lua代碼的時(shí)候,Lua代碼將被當(dāng)成一個(gè)命令去執(zhí)行,并且直到eval命令執(zhí)行完成,Redis才會(huì)執(zhí)行其他命令。
運(yùn)行結(jié)果如下:
針對(duì)?上述代碼,使用兩個(gè)類(lèi)?運(yùn)行,
TicketRunnable3? TicketRunnable4?模擬多進(jìn)程? 多線程場(chǎng)景 ,
場(chǎng)景1:?運(yùn)行時(shí)長(zhǎng) >?過(guò)期時(shí)長(zhǎng)? ??
此時(shí):?鎖自動(dòng)失效,?線程均不用解鎖,即使解鎖也是失敗!
?
代碼及運(yùn)行結(jié)果如下:
import org.springframework.util.StopWatch; import redis.clients.jedis.Jedis;import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/*** 使用RedisTool.tryGetDistributedLock* 實(shí)現(xiàn) 分布式鎖* 終極版本*/ public class TicketRunnable4 implements Runnable {private CountDownLatch count;private CyclicBarrier barrier;private static final Integer Lock_Timeout = 3000; //過(guò)期時(shí)間 代表3秒后過(guò)期private static final Integer ExecuteTime = 5000; private static final Integer RetryInterval = 20;private static final String lockKey = "LockKey";private valatile static boolean working = true;public TicketRunnable4(CountDownLatch count, CyclicBarrier barrier) {this.count = count;this.barrier = barrier;}private int num = 20; //總票數(shù)public void sellTicket(Jedis jedis) {String name = Thread.currentThread().getName();boolean gotLock = false;try {gotLock = RedisTool.tryGetDistributedLock(jedis, lockKey, name, Lock_Timeout);if (gotLock && working) {// Do your jobnum = Integer.parseInt(jedis.get("ticket"));if (num > 0) {num--;jedis.set("ticket", num+"");if (num != 0) System.out.println("============"+name+"=================== 售出票號(hào)" + (num+1)+",還剩" + num + "張票--");else {System.out.println("============"+name+"=================== 售出票號(hào)" + (num+1)+", 票已經(jīng)售完!--");return;}}if (num == 0) {System.out.println("==============="+name+"==================票已經(jīng)被搶空啦");working = false;}Thread.sleep(ExecuteTime);} else {//System.out.println();//System.out.println(name+" Try to get the Lock, and wait " + RetryInterval + " millisecond...");Thread.sleep(RetryInterval);}} catch(Exception e) {System.out.println(e);} finally{try {if (!gotLock || !working) //未獲取到鎖的線程不用解鎖return;/*** 解鎖成功后sleep,嘗試讓出cpu給其他線程機(jī)會(huì)* 解鎖失敗 說(shuō)明鎖已經(jīng)失效 被其他線程獲取到*/if (RedisTool.releaseDistributedLock(jedis, lockKey, name)) {Thread.sleep(100);}} catch (Exception e) {e.printStackTrace();} }}@Overridepublic void run() {String prefix = "#";String threadName = Thread.currentThread().getName();Thread.currentThread().setName(prefix+threadName);System.out.println(Thread.currentThread().getName() +"到達(dá),等待中...");Jedis jedis = new Jedis("localhost", 6379);try {barrier.await();if (Thread.currentThread().getName().equals(prefix+"pool-1-thread-2")) {System.out.println("-------------------全部線程準(zhǔn)備就緒,開(kāi)始搶票------");} else {Thread.sleep(5);}while(working) {sellTicket(jedis);}count.countDown(); //當(dāng)前線程結(jié)束后,計(jì)數(shù)器-1} catch (Exception e) {e.printStackTrace();}}/*** * @param args*/public static void main(String[] args) {int threadNum = 3; //模擬多個(gè)窗口 進(jìn)行售票final CyclicBarrier = new CyclicBarrier(threadNum);final CountDownLatch count = new CountDownLatch(threadNum); //用于統(tǒng)計(jì) 執(zhí)行時(shí)長(zhǎng)StopWatch watch = new StopWatch();watch.start();TicketRunnable4 tickets = new TicketRunnable4(count, barrier);ExecutorService executorService = Executors.newFixedThreadPool(threadNum);//ExecutorService executorService = Executors.newCachedThreadPool();for (int i=0; i<threadNum; i++) { //此處設(shè)置數(shù)值 受限于 線程池中的數(shù)量executorService.submit(tickets);}try {count.await();executorService.shutdown();watch.stop();System.out.println("耗時(shí):" + watch.getTotalTimeSeconds() + "秒");} catch (InterruptedException e) {e.printStackTrace();}} }TicketRunnable3 售出 10 9 8 6 5 3 2 1 票號(hào)。
TicketRunnable4售出 7 4 兩個(gè)票號(hào)
合計(jì)10張票,模擬結(jié)束!
場(chǎng)景2:?運(yùn)行時(shí)長(zhǎng) < 過(guò)期時(shí)長(zhǎng)? ??
此時(shí):?此時(shí)需要有鎖線程去釋放鎖,這樣多線程再去競(jìng)爭(zhēng)獲取鎖。
修改代碼:
private static final Integer Lock_Timeout = 5000; //將時(shí)間從3秒改為5秒
private static final Integer ExecuteTime = 3000;? //將執(zhí)行時(shí)間5秒改為3秒
運(yùn)行結(jié)果如下:
一個(gè)進(jìn)程售出 10 9 8 6 4 3 1 票號(hào)
另一個(gè)進(jìn)程售出 7 5 2 票號(hào)
此時(shí) 每個(gè)線程完成任務(wù)后,均需要釋放鎖,這樣本地線程或是異地線程 才能獲取到鎖,這樣才能有機(jī)會(huì)進(jìn)行任務(wù)的執(zhí)行!
?
?
總結(jié)
以上是生活随笔為你收集整理的基于redis分布式锁实现的多线程并发程序(原创)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: nginx开启core dump文件
- 下一篇: 如何做可靠的分布式锁,Redlock真的