java 单例 读写锁_终极锁实战:单JVM锁+分布式锁
目錄
1.前言
2.單JVM鎖
3.分布式鎖
4.總結(jié)
=========正文分割線=================
1.前言
鎖就像一把鑰匙,需要加鎖的代碼就像一個(gè)房間。出現(xiàn)互斥操作的典型場(chǎng)景:多人同時(shí)想進(jìn)同一個(gè)房間爭(zhēng)搶這個(gè)房間的鑰匙(只有一把),一人搶到鑰匙,其他人都等待這個(gè)人出來(lái)歸還鑰匙,此時(shí)大家再次爭(zhēng)搶鑰匙循環(huán)下去。
作為終極實(shí)戰(zhàn)系列,本篇用java語(yǔ)言分析鎖的原理(源碼剖析)和應(yīng)用(詳細(xì)代碼),根據(jù)鎖的作用范圍分為:JVM鎖和分布式鎖。如理解有誤之處,還請(qǐng)指出。
2.單JVM鎖(進(jìn)程級(jí)別)
程序部署在一臺(tái)服務(wù)器上,當(dāng)容器啟動(dòng)時(shí)(例如tomcat),一臺(tái)JVM就運(yùn)行起來(lái)了。本節(jié)分析的鎖均只能在單JVM下生效。因?yàn)樽罱K鎖定的是某個(gè)對(duì)象,這個(gè)對(duì)象生存在JVM中,自然鎖只能鎖單JVM。這一點(diǎn)很重要。如果你的服務(wù)只部署一個(gè)實(shí)例,那么恭喜你,用以下幾種鎖就可以了。
1.synchronized同步鎖
2.ReentrantLock重入鎖
3.ReadWriteLock讀寫(xiě)鎖
4.StampedLock戳鎖
3.分布式鎖(多服務(wù)節(jié)點(diǎn),多進(jìn)程)
3.1基于數(shù)據(jù)庫(kù)鎖實(shí)現(xiàn)
場(chǎng)景舉例:
賣(mài)商品,先查詢庫(kù)存>0,更新庫(kù)存-1。
1.悲觀鎖:select for update(一致性鎖定讀)
查詢官方文檔如上圖,事務(wù)內(nèi)起作用的行鎖。能夠保證當(dāng)前session事務(wù)所鎖定的行不會(huì)被其他session所修改(這里的修改指更新或者刪除)。對(duì)讀取的記錄加X(jué)鎖,即排它鎖,其他事不能對(duì)上鎖的行加任何鎖。
BEGIN;(確保以下2步驟在一個(gè)事務(wù)中:)
SELECT * FROM tb_product_stock WHERE product_id=1 FOR UPDATE--->product_id有索引,鎖行.加鎖(注:條件字段必須有索引才能鎖行,否則鎖表,且最好用explain查看一下是否使用了索引,因?yàn)橛幸恍?huì)被優(yōu)化掉最終沒(méi)有使用索引)
UPDATE tb_product_stock SET number=number-1 WHERE product_id=1--->更新庫(kù)存-1.解鎖
COMMIT;
2.樂(lè)觀鎖:版本控制,選一個(gè)字段作為版本控制字段,更新前查詢一次,更新時(shí)該字段作為更新條件。不同業(yè)務(wù)場(chǎng)景,版本控制字段,可以0 1控制,也可以+1控制,也可以-1控制,這個(gè)隨意。
BEGIN;(確保以下2步驟在一個(gè)事務(wù)中:)
SELECT number FROM tb_product_stock WHERE product_id=1--》查詢庫(kù)存總數(shù),不加鎖
UPDATE tb_product_stock SET number=number-1 WHERE product_id=1 AND number=第一步查詢到的庫(kù)存數(shù)--》number字段作為版本控制字段
COMMIT;
3.2基于緩存實(shí)現(xiàn)(redis,memcached)
原理:
redisson開(kāi)源jar包,提供了很多功能,其中就包含分布式鎖。是Redis官方推薦的頂級(jí)項(xiàng)目,官網(wǎng)飛機(jī)票
核心org.redisson.api.RLock接口封裝了分布式鎖的獲取和釋放。源碼如下:
1 @Override
2 public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
3 long time = unit.toMillis(waitTime);
4 long current = System.currentTimeMillis();
5 final long threadId = Thread.currentThread().getId();
6 Long ttl = tryAcquire(leaseTime, unit, threadId);//申請(qǐng)鎖,返回還剩余的鎖過(guò)期時(shí)間7 //lock acquired
8 if (ttl == null) {
9 return true;
10 }
11
12 time -= (System.currentTimeMillis() - current);
13 if (time <= 0) {
14 acquireFailed(threadId);
15 return false;
16 }
17
18 current = System.currentTimeMillis();
19 final RFuture subscribeFuture = subscribe(threadId);
20 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
21 if (!subscribeFuture.cancel(false)) {
22 subscribeFuture.addListener(new FutureListener() {
23 @Override
24 public void operationComplete(Future future) throws Exception {
25 if (subscribeFuture.isSuccess()) {
26 unsubscribe(subscribeFuture, threadId);
27 }
28 }
29 });
30 }
31 acquireFailed(threadId);
32 return false;
33 }
34
35 try {
36 time -= (System.currentTimeMillis() - current);
37 if (time <= 0) {
38 acquireFailed(threadId);
39 return false;
40 }
41
42 while (true) {
43 long currentTime = System.currentTimeMillis();
44 ttl = tryAcquire(leaseTime, unit, threadId);
45 //lock acquired
46 if (ttl == null) {
47 return true;
48 }
49
50 time -= (System.currentTimeMillis() - currentTime);
51 if (time <= 0) {
52 acquireFailed(threadId);
53 return false;
54 }
55
56 //waiting for message
57 currentTime = System.currentTimeMillis();
58 if (ttl >= 0 && ttl < time) {
59 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
60 } else {
61 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
62 }
63
64 time -= (System.currentTimeMillis() - currentTime);
65 if (time <= 0) {
66 acquireFailed(threadId);
67 return false;
68 }
69 }
70 } finally {
71 unsubscribe(subscribeFuture, threadId);
72 }
73 //return get(tryLockAsync(waitTime, leaseTime, unit));
74 }
上述方法,調(diào)用加鎖的邏輯就是在tryAcquire(leaseTime, unit, threadId)中,如下圖:
1 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
2 return get(tryAcquireAsync(leaseTime, unit, threadId));//tryAcquireAsync返回RFutrue
3 }
tryAcquireAsync中commandExecutor.evalWriteAsync就是咱們加鎖核心方法了
1 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
2 internalLockLeaseTime = unit.toMillis(leaseTime);
3
4 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
5 "if (redis.call('exists', KEYS[1]) == 0) then " +
6 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
7 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
8 "return nil; " +
9 "end; " +
10 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
11 "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
12 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
13 "return nil; " +
14 "end; " +
15 "return redis.call('pttl', KEYS[1]);",
16 Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
17 }
如上圖,已經(jīng)到了redis命令了
加鎖:
KEYS[1] :需要加鎖的key,這里需要是字符串類型。
ARGV[1] :鎖的超時(shí)時(shí)間,防止死鎖
ARGV[2] :鎖的唯一標(biāo)識(shí),(UUID.randomUUID()) + “:” + threadId
1 //檢查是否key已經(jīng)被占用,如果沒(méi)有則設(shè)置超時(shí)時(shí)間和唯一標(biāo)識(shí),初始化value=1
2 if (redis.call('exists', KEYS[1]) == 0)
3 then
4 redis.call('hset', KEYS[1], ARGV[2], 1); //hset key field value 哈希數(shù)據(jù)結(jié)構(gòu)5 redis.call('pexpire', KEYS[1], ARGV[1]); //pexpire key expireTime 設(shè)置有效時(shí)間
6 return nil;
7 end;
8 //如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一
9 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
10 then
11 redis.call('hincrby', KEYS[1], ARGV[2], 1);//hincrby key filed addValue 加112 redis.call('pexpire', KEYS[1], ARGV[1]);//pexpire key expireTime重新設(shè)置超時(shí)時(shí)間
13 return nil;
14 end;
15 //返回剩余的過(guò)期時(shí)間
16 return redis.call('pttl', KEYS[1]);
以上的方法,當(dāng)返回空是,說(shuō)明獲取到鎖,如果返回一個(gè)long數(shù)值(pttl 命令的返回值),說(shuō)明鎖已被占用,通過(guò)返回剩余時(shí)間,外部可以做一些等待時(shí)間的判斷和調(diào)整。
不再分析解鎖步驟,直接貼上解鎖的redis 命令
解鎖:
– KEYS[1] :需要加鎖的key,這里需要是字符串類型。
– KEYS[2] :redis消息的ChannelName,一個(gè)分布式鎖對(duì)應(yīng)唯一的一個(gè)channelName:“redisson_lock__channel__{” + getName() + “}”
– ARGV[1] :reids消息體,這里只需要一個(gè)字節(jié)的標(biāo)記就可以,主要標(biāo)記redis的key已經(jīng)解鎖,再結(jié)合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請(qǐng)鎖。
– ARGV[2] :鎖的超時(shí)時(shí)間,防止死鎖
– ARGV[3] :鎖的唯一標(biāo)識(shí),(UUID.randomUUID()) + “:” + threadId
1 //如果key已經(jīng)不存在,說(shuō)明已經(jīng)被解鎖,直接發(fā)布(publihs)redis消息
2 if (redis.call('exists', KEYS[1]) == 0)
3 then
4 redis.call('publish', KEYS[2], ARGV[1]);//publish ChannelName message向信道發(fā)送解鎖消息5 return 1;
6 end;
7 //key和field不匹配,說(shuō)明當(dāng)前客戶端線程沒(méi)有持有鎖,不能主動(dòng)解鎖。
8 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
9 then
10 return nil;
11 end;
12 //將value減1
13 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); //hincrby key filed addValue 減1
14 //如果counter>0說(shuō)明鎖在重入,不能刪除key
15 if (counter > 0)
16 then
17 redis.call('pexpire', KEYS[1], ARGV[2]);
18 return 0;
19 else
20 //刪除key并且publish 解鎖消息
21 redis.call('del', KEYS[1]);
22 redis.call('publish', KEYS[2], ARGV[1]);
23 return 1;
24 end;
25 return nil;
特點(diǎn):
邏輯并不復(fù)雜, 實(shí)現(xiàn)了可重入功能, 通過(guò)pub/sub功能來(lái)減少空轉(zhuǎn),性能極高。
實(shí)現(xiàn)了Lock的大部分功能,支持強(qiáng)制解鎖。
實(shí)戰(zhàn):
1.創(chuàng)建客戶端配置類:
這里我們最終只用了一種來(lái)測(cè)試,就是initSingleServerConfig單例模式。
1 package distributed.lock.redis;
2
3 import org.redisson.config.Config;
4
5 /**
6 *7 * @ClassName:RedissionConfig8 * @Description:自定義RedissionConfig初始化方法9 * 支持自定義構(gòu)造:單例模式,集群模式,主從模式,哨兵模式。10 * 注:此處使用spring bean 配置文件保證bean單例,見(jiàn)applicationContext-redis.xml11 * 大家也可以用工廠模式自己維護(hù)單例:本類生成RedissionConfig,再RedissonClient redisson = Redisson.create(config);這樣就可以創(chuàng)建RedissonClient12 *@authordiandian.zhang13 * @date 2017年7月20日下午12:55:5014 */
15 public class RedissionConfig {
16 private RedissionConfig() {
17 }
18
19 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword) {
20 return initSingleServerConfig(redisHost, redisPort, redisPassword, 0);
21 }
22
23 /**
24 *25 * @Description 使用單例模式初始化構(gòu)造Config26 *@paramredisHost27 *@paramredisPort28 *@paramredisPassword29 *@paramredisDatabase redis db 默認(rèn)0 (0~15)有redis.conf配置文件中參數(shù)來(lái)控制數(shù)據(jù)庫(kù)總數(shù):database 16.30 *@return
31 *@authordiandian.zhang32 * @date 2017年7月20日下午12:56:2133 *@sinceJDK1.834 */
35 public static Config initSingleServerConfig(String redisHost, String redisPort, String redisPassword,Integer redisDatabase) {
36 Config config = new Config();
37 config.useSingleServer().setAddress(redisHost + ":" + redisPort)
38 .setPassword(redisPassword)
39 .setDatabase(redisDatabase);//可以不設(shè)置,看業(yè)務(wù)是否需要隔離40 //RedissonClient redisson = Redisson.create(config);
41 return config;
42 }
43
44 /**
45 *46 * @Description 集群模式47 *@parammasterAddress48 *@paramnodeAddressArray49 *@return
50 *@authordiandian.zhang51 * @date 2017年7月20日下午3:29:3252 *@sinceJDK1.853 */
54 public static Config initClusterServerConfig(String masterAddress, String[] nodeAddressArray) {
55 String nodeStr = "";
56 for(String slave:nodeAddressArray){
57 nodeStr +=","+slave;
58 }
59 Config config = new Config();
60 config.useClusterServers()
61 .setScanInterval(2000) //cluster state scan interval in milliseconds
62 .addNodeAddress(nodeStr);
63 return config;
64 }
65
66 /**
67 *68 * @Description 主從模式69 *@parammasterAddress 一主70 *@paramslaveAddressArray 多從71 *@return
72 *@authordiandian.zhang73 * @date 2017年7月20日下午2:29:3874 *@sinceJDK1.875 */
76 public static Config initMasterSlaveServerConfig(String masterAddress, String[] slaveAddressArray) {
77 String slaveStr = "";
78 for(String slave:slaveAddressArray){
79 slaveStr +=","+slave;
80 }
81 Config config = new Config();
82 config.useMasterSlaveServers()
83 .setMasterAddress(masterAddress)//一主
84 .addSlaveAddress(slaveStr);//多從"127.0.0.1:26389", "127.0.0.1:26379"
85 return config;
86 }
87
88 /**
89 *90 * @Description 哨兵模式91 *@parammasterAddress92 *@paramslaveAddressArray93 *@return
94 *@authordiandian.zhang95 * @date 2017年7月20日下午3:01:3596 *@sinceJDK1.897 */
98 public static Config initSentinelServerConfig(String masterAddress, String[] sentinelAddressArray) {
99 String sentinelStr = "";
100 for(String sentinel:sentinelAddressArray){
101 sentinelStr +=","+sentinel;
102 }
103 Config config = new Config();
104 config.useSentinelServers()
105 .setMasterName("mymaster")
106 .addSentinelAddress(sentinelStr);
107 return config;
108 }
109
110
111 }
2.分布式鎖實(shí)現(xiàn)類
1 package distributed.lock.redis;
2
3
4
5 import java.text.SimpleDateFormat;
6 import java.util.Date;
7 import java.util.concurrent.CountDownLatch;
8 import java.util.concurrent.TimeUnit;
9
10 import org.redisson.Redisson;
11 import org.redisson.api.RLock;
12 import org.redisson.api.RedissonClient;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16
17 public class RedissonTest {
18 private static final Logger logger = LoggerFactory.getLogger(RedissonTest.class);
19 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
20 //這里可自定義多種模式,單例,集群,主從,哨兵模式。為了簡(jiǎn)單這里使用單例模式
21 private static RedissonClient redissonClient = Redisson.create(RedissionConfig.initSingleServerConfig("192.168.50.107", "6379", "password"));
22
23 public static void main(String[] args) {
24 CountDownLatch latch = new CountDownLatch(3);
25 //key
26 String lockKey = "testkey20170802";
27 try {
28 Thread t1 = new Thread(() -> {
29 doWithLock(lockKey,latch);//函數(shù)式編程
30 }, "t1");
31 Thread t2 = new Thread(() -> {
32 doWithLock(lockKey,latch);
33 }, "t2");
34 Thread t3 = new Thread(() -> {
35 doWithLock(lockKey,latch);
36 }, "t3");
37 //啟動(dòng)線程
38 t1.start();
39 t2.start();
40 t3.start();
41 //等待全部完成
42 latch.await();
43 System.out.println("3個(gè)線程都解鎖完畢,關(guān)閉客戶端!");
44 redissonClient.shutdown();
45 } catch (Exception e) {
46 e.printStackTrace();
47 }
48 }
49
50 /**
51 *52 * @Description 線程執(zhí)行函數(shù)體53 *@paramlockKey54 *@authordiandian.zhang55 * @date 2017年8月2日下午3:37:3256 *@sinceJDK1.857 */
58 private static void doWithLock(String lockKey,CountDownLatch latch) {
59 try {
60 System.out.println("進(jìn)入線程="+Thread.currentThread().getName()+":"+time.format(new Date()));
61 //獲取鎖,30秒內(nèi)獲取到返回true,未獲取到返回false,60秒過(guò)后自動(dòng)unLock
62 if (tryLock(lockKey, 30, 60, TimeUnit.SECONDS)) {
63 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)"+time.format(new Date()));
64 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務(wù)65 //獲取鎖超時(shí)
66 }else{
67 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時(shí)!"+time.format(new Date()));
68 }
69 } catch (Exception e) {
70 e.printStackTrace();
71 } finally {
72 try {
73 //釋放鎖
74 unLock(lockKey);
75 latch.countDown();//完成,計(jì)數(shù)器減一
76 } catch (Exception e) {
77 e.printStackTrace();
78 }
79 }
80 }
81
82 /**
83 *84 * @Description 獲取鎖,鎖waitTime時(shí)間內(nèi)獲取到返回true,未獲取到返回false,租賃期leaseTime過(guò)后unLock(除非手動(dòng)釋放鎖)85 *@paramkey86 *@paramwaitTime87 *@paramleaseTime88 *@paramtimeUnit89 *@return
90 *@authordiandian.zhang91 * @date 2017年8月2日下午3:24:0992 *@sinceJDK1.893 */
94 public static boolean tryLock(String key, long waitTime, long leaseTime, TimeUnit timeUnit) {
95 try {
96 //根據(jù)key獲取鎖實(shí)例,非公平鎖
97 RLock lock = redissonClient.getLock(key);
98 //在leaseTime時(shí)間內(nèi)阻塞獲取鎖,獲取鎖后持有鎖直到leaseTime租期結(jié)束(除非手動(dòng)unLock釋放鎖)。
99 return lock.tryLock(waitTime, leaseTime, timeUnit);
100 } catch (Exception e) {
101 logger.error("redis獲取分布式鎖異常;key=" + key + ",waitTime=" + waitTime + ",leaseTime=" + leaseTime +
102 ",timeUnit=" + timeUnit, e);
103 return false;
104 }
105 }
106
107 /**
108 *109 * @Description 釋放鎖110 *@paramkey111 *@authordiandian.zhang112 * @date 2017年8月2日下午3:25:34113 *@sinceJDK1.8114 */
115 public static void unLock(String key) {
116 RLock lock = redissonClient.getLock(key);
117 lock.unlock();
118 System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date()));
119 }
120 }
執(zhí)行結(jié)果如下:
1 進(jìn)入線程=t3:2017-08-02 16:33:19
2 進(jìn)入線程=t1:2017-08-02 16:33:19
3 進(jìn)入線程=t2:2017-08-02 16:33:19
4 t2 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-08-02 16:33:19--->T2 19秒時(shí)獲取到鎖
5 t2 釋放鎖2017-08-02 16:33:21--->T2任務(wù)完成,21秒時(shí)釋放鎖
6 t1 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-08-02 16:33:21--->T1 21秒時(shí)獲取到鎖
7 t1 釋放鎖2017-08-02 16:33:23--->T2任務(wù)完成,23秒時(shí)釋放鎖
8 t3 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-08-02 16:33:23--->T3 23秒時(shí)獲取到鎖
9 t3 釋放鎖2017-08-02 16:33:25--->T2任務(wù)完成,25秒時(shí)釋放鎖
10 3個(gè)線程都解鎖完畢,關(guān)閉客戶端!
如上圖,3個(gè)線程共消耗25-19=6秒,驗(yàn)證通過(guò),確實(shí)互斥鎖住了。
我們用Redis Desktop Manger來(lái)看一下redis中數(shù)據(jù):
1 192.168.50.107:0>hgetall "testkey20170802"--->用key查詢hash所有的值
2 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:30--->T2獲取到鎖field=uuid:線程號(hào)
3 2) 1 --->value=1代表重入次數(shù)為1
4 192.168.50.107:0>hgetall "testkey20170802"--->T2釋放鎖,T1獲取到鎖
5 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:29
6 2) 1
7 192.168.50.107:0>hgetall "testkey20170802"--->T1釋放鎖,T3獲取到鎖
8 1) 159b46b3-8bc5-4447-ad57-c55fdd381384:31
9 2) 1
10 192.168.50.107:0>hgetall "testkey20170802"--->最后一次查詢時(shí),T3釋放鎖,已無(wú)數(shù)據(jù)
2)基于zookeeper實(shí)現(xiàn)
原理:
每個(gè)客戶端(每個(gè)JVM內(nèi)部共用一個(gè)客戶端實(shí)例)對(duì)某個(gè)方法加鎖時(shí),在zookeeper上指定節(jié)點(diǎn)的目錄下,生成一個(gè)唯一的瞬時(shí)有序節(jié)點(diǎn)。判斷是否獲取鎖的方式很簡(jiǎn)單,只需要判斷有序節(jié)點(diǎn)中序號(hào)最小的一個(gè)。當(dāng)釋放鎖的時(shí)候,只需將這個(gè)瞬時(shí)節(jié)點(diǎn)刪除即可。
我們使用apache的Curator組件來(lái)實(shí)現(xiàn),一般使用Client、Framework、Recipes三個(gè)組件。
curator下,InterProcessMutex可重入互斥公平鎖,源碼(curator-recipes-2.4.1.jar)注釋如下:
A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is "fair" - each user will get the mutex in the order requested (from ZK's point of view)
即一個(gè)在JVM上工作的可重入互斥鎖。使用ZK去持有這把鎖。在所有JVM中的進(jìn)程組,只要使用相同的鎖路徑將會(huì)獲得進(jìn)程間的臨界資源。進(jìn)一步說(shuō),這個(gè)互斥鎖是公平的-因?yàn)槊總€(gè)線程將會(huì)根據(jù)請(qǐng)求順序獲得這個(gè)互斥量(對(duì)于ZK來(lái)說(shuō))
主要方法如下:
1 //構(gòu)造方法
2 public InterProcessMutex(CuratorFramework client, String path)
3 public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
4 //通過(guò)acquire獲得鎖,并提供超時(shí)機(jī)制:
5 public void acquire() throws Exception
6 public boolean acquire(long time, TimeUnit unit) throws Exception
7 //撤銷鎖
8 public void makeRevocable(RevocationListener listener)
9 public void makeRevocable(final RevocationListener listener, Executor executor)
我們主要分析核心獲取鎖acquire方法如下:
1 @Override
2 public boolean acquire(long time, TimeUnit unit) throws Exception
3 {
4 return internalLock(time, unit);5 }
6
7 private boolean internalLock(long time, TimeUnit unit) throws Exception
8 {
9 /*
10 Note on concurrency: a given lockData instance11 can be only acted on by a single thread so locking isn't necessary12 */
13
14 Thread currentThread = Thread.currentThread();
15 //線程安全map:private final ConcurrentMap?? threadData = Maps.newConcurrentMap();
16 LockData lockData =threadData.get(currentThread);
17 if ( lockData != null )
18 {
19 //這里實(shí)現(xiàn)了可重入,如果當(dāng)前線程已經(jīng)獲取鎖,計(jì)數(shù)+1,直接返回true
20 lockData.lockCount.incrementAndGet();
21 return true;
22 }
23 //獲取鎖,核心方法
24 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
25 if ( lockPath != null )
26 { //得到鎖,塞進(jìn)線程安全map
27 LockData newLockData = new LockData(currentThread, lockPath);
28 threadData.put(currentThread, newLockData);
29 return true;
30 }
31
32 return false;
33 }
核心獲取鎖的方法attemptLock源碼如下:
1 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
2 {
3 final long startMillis = System.currentTimeMillis();
4 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
5 final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
6 int retryCount = 0;
7
8 String ourPath = null;
9 boolean hasTheLock = false;
10 boolean isDone = false;
11 while ( !isDone )
12 {
13 isDone = true;
14
15 try
16 {
17 if ( localLockNodeBytes != null )
18 {
19 ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes);
20 }
21 else
22 { //創(chuàng)建瞬時(shí)節(jié)點(diǎn)(客戶端斷開(kāi)連接時(shí)刪除),節(jié)點(diǎn)名追加自增數(shù)字
23 ourPath =client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);24 }
//自循環(huán)等待時(shí)間,并判斷是否獲取到鎖
25 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
26 }
27 catch ( KeeperException.NoNodeException e )
28 {
29 //gets thrown by StandardLockInternalsDriver when it can't find the lock node30 //this can happen when the session expires, etc. So, if the retry allows, just try it all again
31 if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
32 {
33 isDone = false;
34 }
35 else
36 {
37 throw e;
38 }
39 }
40 }
41 //獲取到鎖返回節(jié)點(diǎn)path
42 if ( hasTheLock )
43 {
44 return ourPath;
45 }
46
47 return null;
48 }
自循環(huán)等待時(shí)間:
1 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
2 {
3 boolean haveTheLock = false;
4 boolean doDelete = false;
5 try
6 {
7 if ( revocable.get() != null )
8 {
9 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
10 }
11
12 while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )//如果狀態(tài)是開(kāi)始且未獲取到鎖
13 {
14 List children = getSortedChildren();//獲取父節(jié)點(diǎn)下所有線程的子節(jié)點(diǎn)
15 String sequenceNodeName = ourPath.substring(basePath.length() + 1); //獲取當(dāng)前節(jié)點(diǎn)名稱
16//核心方法:判斷是否獲取到鎖
17 PredicateResults predicateResults =driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
18 if ( predicateResults.getsTheLock() )//獲取到鎖,置true,下一次循環(huán)退出
19 {
20 haveTheLock = true;
21 }
22 else//沒(méi)有索取到鎖
23 {
24 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();//這里路徑是上一次獲取到鎖的持有鎖路徑
25
26 synchronized(this)//強(qiáng)制加鎖
27 {
//讓線程等待,并且watcher當(dāng)前節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)有變化的之后,則notifyAll當(dāng)前等待的線程,讓它再次進(jìn)入來(lái)爭(zhēng)搶鎖
28 Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
29 if ( stat != null )
30 {
31 if ( millisToWait != null )
32 {
33 millisToWait -= (System.currentTimeMillis() - startMillis);
34 startMillis = System.currentTimeMillis();
35 if ( millisToWait <= 0 )
36 {
37 doDelete = true; //等待超時(shí),置狀態(tài)為true,后面會(huì)刪除節(jié)點(diǎn)
38 break;
39 }
40 //等待指定時(shí)間
41 wait(millisToWait);
42 }
43 else
44 { //一直等待
45 wait();
46 }
47 }
48 }
49 //else it may have been deleted (i.e. lock released). Try to acquire again
50 }
51 }
52 }
53 catch ( Exception e )
54 {
55 doDelete = true;
56 throw e;
57 }
58 finally
59 {
60 if ( doDelete )//刪除path
61 {
62 deleteOurPath(ourPath);
63 }
64 }
65 return haveTheLock;
66 }
1 @Override
2 public PredicateResults getsTheLock(CuratorFramework client, List children, String sequenceNodeName, int maxLeases) throws Exception
3 {
4 int ourIndex = children.indexOf(sequenceNodeName);//先根據(jù)子節(jié)點(diǎn)名獲取children(所有子節(jié)點(diǎn)升序集合)中的索引
5 validateOurIndex(sequenceNodeName, ourIndex);//校驗(yàn)如果索引為負(fù)值,即不存在該子節(jié)點(diǎn)
6 //maxLeases允許同時(shí)租賃的數(shù)量,這里源代碼寫(xiě)死了1,但這種設(shè)計(jì)符合將來(lái)拓展,修改maxLeases即可滿足多租賃
7 boolean getsTheLock = ourIndex
8 String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//獲取到鎖返回null,否則未獲取到鎖,獲取上一次的獲取到鎖的路徑。后面會(huì)監(jiān)視這個(gè)路徑用以喚醒請(qǐng)求線程
9
10 return new PredicateResults(pathToWatch, getsTheLock);
11 }
特點(diǎn):
1.可避免死鎖:zk瞬時(shí)節(jié)點(diǎn)(Ephemeral Nodes)生命周期和session一致,session結(jié)束,節(jié)點(diǎn)自動(dòng)刪除。
2.依賴zk創(chuàng)建節(jié)點(diǎn),涉及文件操作,開(kāi)銷較大。
實(shí)戰(zhàn):
1.創(chuàng)建客戶端client
2.生成互斥鎖InterProcessMutex
3.開(kāi)啟3個(gè)線程去獲取鎖
1 package distributed.lock.zk;
2
3 import java.text.SimpleDateFormat;
4 import java.util.Date;
5 import java.util.concurrent.TimeUnit;
6
7 import org.apache.curator.framework.CuratorFramework;
8 import org.apache.curator.framework.CuratorFrameworkFactory;
9 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
10 import org.apache.curator.retry.ExponentialBackoffRetry;
11 import org.apache.curator.retry.RetryNTimes;
12 import org.jboss.netty.channel.StaticChannelPipeline;
13 import org.omg.CORBA.PRIVATE_MEMBER;
14
15 /**
16 *17 * @ClassName:CuratorDistrLockTest18 * @Description:Curator包實(shí)現(xiàn)zk分布式鎖:利用了zookeeper的臨時(shí)順序節(jié)點(diǎn)特性,一旦客戶端失去連接后,則就會(huì)自動(dòng)清除該節(jié)點(diǎn)。19 *@authordiandian.zhang20 * @date 2017年7月11日下午12:43:4421 */
22
23 public class CuratorDistrLock {
24 private static final String ZK_ADDRESS = "192.168.50.253:2181";//zk
25 private static final String ZK_LOCK_PATH = "/zktest";//path
26 static SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
27
28 public static void main(String[] args) {
29 try {
30 //創(chuàng)建zk客戶端31 //CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(3, 1000));
32 CuratorFramework client = CuratorFrameworkFactory.builder()
33 .connectString(ZK_ADDRESS)
34 .sessionTimeoutMs(5000)
35 .retryPolicy(new ExponentialBackoffRetry(1000, 10))
36 .build();
37 //開(kāi)啟
38 client.start();
39 System.out.println("zk client start successfully!"+time.format(new Date()));
40
41 Thread t1 = new Thread(() -> {
42 doWithLock(client);//函數(shù)式編程
43 }, "t1");
44 Thread t2 = new Thread(() -> {
45 doWithLock(client);
46 }, "t2");
47 Thread t3 = new Thread(() -> {
48 doWithLock(client);
49 }, "t3");
50 //啟動(dòng)線程
51 t1.start();
52 t2.start();
53 t3.start();
54 } catch (Exception e) {
55 e.printStackTrace();
56 }
57 }
58
59 /**
60 *61 * @Description 線程執(zhí)行函數(shù)體62 *@paramclient63 *@paramlock64 *@authordiandian.zhang65 * @date 2017年7月12日下午6:00:5366 *@sinceJDK1.867 */
68 private static void doWithLock(CuratorFramework client) {
69 //依賴ZK生成的可重入互斥公平鎖(按照請(qǐng)求順序)
70 InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
71 try {
72 System.out.println("進(jìn)入線程="+Thread.currentThread().getName()+":"+time.format(new Date()));
73
74 //花20秒時(shí)間嘗試獲取鎖
75 if (lock.acquire(20, TimeUnit.SECONDS)) {
76 System.out.println(Thread.currentThread().getName() + " 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)"+time.format(new Date()));
77 Thread.sleep(2000L);//休息2秒模擬執(zhí)行需要加鎖的任務(wù)78 //獲取鎖超時(shí)
79 }else{
80 System.out.println(Thread.currentThread().getName() + " 獲取鎖超時(shí)!"+time.format(new Date()));
81 }
82 } catch (Exception e) {
83 e.printStackTrace();
84 } finally {
85 try {
86 //當(dāng)前線程獲取到鎖,那么最后需要釋放鎖(實(shí)際上是刪除節(jié)點(diǎn))
87 if (lock.isAcquiredInThisProcess()) {
88 lock.release();
89 System.out.println(Thread.currentThread().getName() + " 釋放鎖"+time.format(new Date()));
90 }
91 } catch (Exception e) {
92 e.printStackTrace();
93 }
94 }
95 }
96
97 }
執(zhí)行結(jié)果:
zk client start successfully!
進(jìn)入線程=t2:2017-07-13 11:13:23
進(jìn)入線程=t1:2017-07-13 11:13:23
進(jìn)入線程=t3:2017-07-13 11:13:23
t2 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-07-13 11:13:23----》起始時(shí)間23秒
t2 釋放鎖2017-07-13 11:13:25
t3 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-07-13 11:13:25----》驗(yàn)證耗時(shí)2秒,T2執(zhí)行完,T3執(zhí)行
t3 釋放鎖2017-07-13 11:13:27
t1 獲取鎖成功!,執(zhí)行需要加鎖的任務(wù)2017-07-13 11:13:27----》驗(yàn)證耗時(shí)2秒,T3執(zhí)行完,T1執(zhí)行
t1 釋放鎖2017-07-13 11:13:29----》驗(yàn)證耗時(shí)2秒,T1執(zhí)行完,3個(gè)任務(wù)共耗時(shí)=29-23=6秒,驗(yàn)證互斥鎖達(dá)到目標(biāo)。
查看zookeeper節(jié)點(diǎn):
1.客戶端連接
zkCli.sh -server 192.168.50.253:2181
2.查看節(jié)點(diǎn)
[zk: 192.168.50.253:2181(CONNECTED) 80] ls /-----》查看根目錄
[dubbo, zktest, zookeeper, test]
[zk: 192.168.50.253:2181(CONNECTED) 81] ls /zktest -----》查看我們創(chuàng)建的子節(jié)點(diǎn)
[_c_034e5f23-abaf-4d4a-856f-c27956db574e-lock-0000000007, _c_63c708f1-2c3c-4e59-9d5b-f0c70c149758-lock-0000000006, _c_1f688cb7-c38c-4ebb-8909-0ba421e484a4-lock-0000000008]
[zk: 192.168.50.253:2181(CONNECTED) 82] ls /zktest-----》任務(wù)執(zhí)行完畢最終釋放了子節(jié)點(diǎn)
[]
4.總結(jié)比較
一級(jí)鎖分類
二級(jí)鎖分類
鎖名稱
特性
是否推薦
單JVM鎖
基于JVM源生synchronized關(guān)鍵字實(shí)現(xiàn)
synchronized同步鎖
適用于低并發(fā)的情況,性能穩(wěn)定。
新手推薦
基于JDK實(shí)現(xiàn),需顯示獲取鎖,釋放鎖
ReentrantLock可重入鎖
適用于低、高并發(fā)的情況,性能較高
需要指定公平、非公平或condition時(shí)使用。
ReentrantReadWriteLock
可重入讀寫(xiě)鎖
適用于讀多寫(xiě)少的情況。性能高。
老司機(jī)推薦
StampedLock戳鎖
JDK8才有,適用于高并發(fā)且讀遠(yuǎn)大于寫(xiě)時(shí),支持樂(lè)觀讀,票據(jù)校驗(yàn)失敗后可升級(jí)悲觀讀鎖,性能極高!
老司機(jī)推薦
分布式鎖
基于數(shù)據(jù)庫(kù)鎖實(shí)現(xiàn)
悲觀鎖:select for update
sql直接使用,但水很深。涉及數(shù)據(jù)庫(kù)ACID原理+隔離級(jí)別+不同數(shù)據(jù)庫(kù)規(guī)范
不推薦
樂(lè)觀鎖:版本控制
自己實(shí)現(xiàn)字段版本控制
新手推薦
基于緩存實(shí)現(xiàn)
org.redisson
性能極高,支持除了分布式鎖外還實(shí)現(xiàn)了分布式對(duì)象、分布式集合等極端強(qiáng)大的功能
老司機(jī)推薦
基于zookeeper實(shí)現(xiàn)
org.apache.curator zookeeper
性能較高,除支持分布式鎖外,還實(shí)現(xiàn)了master選舉、節(jié)點(diǎn)監(jiān)聽(tīng)()、分布式隊(duì)列、Barrier、AtomicLong等計(jì)數(shù)器
老司機(jī)推薦
=====附Redis命令=======
SETNX key value (SET if Not eXists):當(dāng)且僅當(dāng) key 不存在,將 key 的值設(shè)為 value ,并返回1;若給定的 key 已經(jīng)存在,則 SETNX 不做任何動(dòng)作,并返回0。詳見(jiàn):SETNX commond
GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時(shí),返回一個(gè)錯(cuò)誤,當(dāng)key不存在時(shí),返回nil。詳見(jiàn):GETSET commond
GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil 。詳見(jiàn):GET Commond
DEL key [KEY …]:刪除給定的一個(gè)或多個(gè) key ,不存在的 key 會(huì)被忽略,返回實(shí)際刪除的key的個(gè)數(shù)(integer)。詳見(jiàn):DEL Commond
HSET key field value:給一個(gè)key 設(shè)置一個(gè){field=value}的組合值,如果key沒(méi)有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.詳見(jiàn):HSET Commond
HEXISTS key field:當(dāng)key 中存儲(chǔ)著field的時(shí)候返回1,如果key或者field至少有一個(gè)不存在返回0。詳見(jiàn)HEXISTS Commond
HINCRBY key field increment:將存儲(chǔ)在 key 中的哈希(Hash)對(duì)象中的指定字段 field 的值加上增量 increment。如果鍵 key 不存在,一個(gè)保存了哈希對(duì)象的新建將被創(chuàng)建。如果字段 field 不存在,在進(jìn)行當(dāng)前操作前,其將被創(chuàng)建,且對(duì)應(yīng)的值被置為 0。返回值是增量之后的值。詳見(jiàn):HINCRBY Commond
PEXPIRE key milliseconds:設(shè)置存活時(shí)間,單位是毫秒。expire操作單位是秒。詳見(jiàn):PEXPIRE Commond
PUBLISH channel message:向channel post一個(gè)message內(nèi)容的消息,返回接收消息的客戶端數(shù)。詳見(jiàn)PUBLISH Commond
======參考======
總結(jié)
以上是生活随笔為你收集整理的java 单例 读写锁_终极锁实战:单JVM锁+分布式锁的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 冯雪 手术机器人的应用_未来达芬奇手术机
- 下一篇: oralce load的时候使用触发器会