RedLock 实现分布式锁
并發(fā)是程序開發(fā)中不可避免的問題,根據(jù)系統(tǒng)面向用戶、功能場景的不同,并發(fā)的重視程度會有不同。從程序的角度來說,并發(fā)意味著相同的時間點執(zhí)行了相同的代碼,而有些情況是不被允許的,比如:轉賬、搶購占庫存等,如果沒有做好臨界條件的驗證,會帶來非常嚴重的后果。追根結底是因為并發(fā)引起的數(shù)據(jù)不一致問題,面對并發(fā),我們通常會采用鎖來優(yōu)化。
場景模擬
如下模擬搶購的示例代碼(C#):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // 有10個商品庫存 private static int stockCount = 10; public bool Buy() { // 模擬執(zhí)行的邏輯代碼花費的時間 Thread.Sleep(new Random().Next(100,500)); if (stockCount > 0) { stockCount--; return true; } return false; } |
| 1 2 3 4 5 6 7 8 9 10 11 | var test = new Test(); Parallel.For(1, 16, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var data = test.Buy(); stopwatch.Stop(); Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); Console.ReadKey(); |
模擬并行調用 Buy 方法 15 次(內部使用的是線程池,所以 ThreadId 會有重復),實際上只有 10 個庫存,返回結果卻顯示 11 個請求都購買成功了。
單機部署模式解決方案
在單機部署模式下,我們只需要加 lock(){} 就可以解決問題:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | // 有10個商品庫存 private static int stockCount = 10; private static object obj = new object(); public bool Buy() { lock (obj) { // 模擬執(zhí)行的邏輯代碼花費的時間 Thread.Sleep(new Random().Next(100, 500)); if (stockCount > 0) { stockCount--; return true; } return false; } } |
從輸出結果中可以看出,確實只有10個請求是顯示購買成功,但同時發(fā)現(xiàn)部分請求的執(zhí)行時間明顯變長,這就是加鎖帶來的最直觀影響,當某個線程獲得鎖之后,在沒有釋放之前,其他線程只能繼續(xù)等待,并發(fā)越高,更多的線程需要等待輪流被處理。
各種語言一般都提供了鎖的實現(xiàn),用法大同小異,語言本身實現(xiàn)的鎖只能作用于當前進程內,所以在單機模式部署的系統(tǒng)中使用基本沒什么問題。
集群部署模式解決方案(分布式鎖)
在集群模式下,系統(tǒng)部署于多臺機器(一個系統(tǒng)運行在多個進程中),語言本身實現(xiàn)的鎖只能確保當前進程內有效(基于內存),多進程就沒辦法共享鎖狀態(tài),這時我們就得考慮采用分布式鎖,分布式鎖可以采用?數(shù)據(jù)庫、ZooKeeper、Redis?等來實現(xiàn),最終都是為了達到在不同的進程、線程內能共享鎖狀態(tài)的目的。
這里將介紹基于 Redis 的?RedLock.net?來解決分布式下的并發(fā)問題,RedLock.net 是 RedLock 分布式鎖算法的 .NET 版實現(xiàn) (大部分語言都有對應的實現(xiàn),查看) ,RedLock 分布式鎖算法是由 Redis 的作者提出。
RedLock 簡介
RedLock 的思想是使用多臺 Redis Master ,節(jié)點完全獨立,節(jié)點間不需要進行數(shù)據(jù)同步,因為 Master-Slave 架構一旦 Master 發(fā)生故障時數(shù)據(jù)沒有復制到 Slave,被選為 Master 的 Slave 就丟掉了鎖,另一個客戶端就可以再次拿到鎖。鎖通過 setNX(原子操作) 命令設置,在有效時間內當獲得鎖的數(shù)量大于 (n/2+1) 代表成功,失敗后需要向所有節(jié)點發(fā)送釋放鎖的消息。
獲取鎖:
| 1 | SET resource_name my_random_value NX PX 30000 |
釋放鎖:
| 1 2 3 4 5 | if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end |
RedLock.net 集成
創(chuàng)建 .NETCore API 項目
Nuget 安裝 RedLock.net
| 1 | Install-Package RedLock.net |
appsettings.json 添加 redis 配置
| 1 2 3 4 | { "RedisUrl": "127.0.0.1:6379", // 多個用,分割 ... } |
添加 ProductService.cs,模擬商品購買
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | // 有10個商品庫存,如果同時啟動多個API服務進行測試,這里改成存數(shù)據(jù)庫或其他方式 private static int stockCount = 10; public async Task<bool> BuyAsync() { // 模擬執(zhí)行的邏輯代碼花費的時間 await Task.Delay(new Random().Next(100, 500)); if (stockCount > 0) { stockCount--; return true; } return false; } |
修改 Startup.cs ,創(chuàng)建 RedLockFactory
定義 RedLockFactory 變量:
| 1 | private RedLockFactory lockFactory; |
添加方法:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private RedLockFactory GetRedLockFactory() { var redisUrl = Configuration["RedisUrl"]; if (string.IsNullOrEmpty(redisUrl)) { throw new ArgumentException("RedisUrl 不能為空"); } var urls = redisUrl.Split(",").ToList(); var endPoints = new List<RedLockEndPoint>(); foreach (var item in urls) { var arr = item.Split(":"); endPoints.Add(new DnsEndPoint(arr[0], Convert.ToInt32(arr[1]))); } return RedLockFactory.Create(endPoints); } |
在 ConfigureServices 注入 IDistributedLockFactory:
| 1 2 3 | lockFactory = GetRedLockFactory(); services.AddSingleton(typeof(IDistributedLockFactory), lockFactory); services.AddScoped(typeof(ProductService)); |
修改 Configure,應用程序結束時釋放 lockFactory :
| 1 2 3 4 5 6 7 8 9 | public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ... lifetime.ApplicationStopping.Register(() => { lockFactory.Dispose(); }); } |
在 Controller 添加方法 DistributedLockTest
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | private readonly IDistributedLockFactory _distributedLockFactory; private readonly ProductService _productService; public HomeController(IDistributedLockFactory distributedLockFactory, ProductService productService) { _distributedLockFactory = distributedLockFactory; _productService = productService; } [HttpGet] public async Task<bool> DistributedLockTest() { var productId = "id"; // resource 鎖定的對象 // expiryTime 鎖定過期時間,鎖區(qū)域內的邏輯執(zhí)行如果超過過期時間,鎖將被釋放 // waitTime 等待時間,相同的 resource 如果當前的鎖被其他線程占用,最多等待時間 // retryTime 等待時間內,多久嘗試獲取一次 using (var redLock = await _distributedLockFactory.CreateLockAsync(productId, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(20))) { if (redLock.IsAcquired) { var result = await _productService.BuyAsync(); return result; } else { Console.WriteLine($"獲取鎖失敗:{DateTime.Now}"); } } return false; } |
調用接口測試
| 1 2 3 4 5 6 7 8 | Parallel.For(1, 16, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var data = GetAsync($"http://localhost:5000/home/distributedLockTest").Result; stopwatch.Stop(); Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); |
關于 RedLock 分布式鎖算法的爭議大家可以參考:
How to do distributed locking
Is Redlock safe?
總結
如果使用鎖,必然對性能上會有一定影響,我們需要根據(jù)實際場景來判斷是真正需要。在指定鎖過期時間時要相對合理,避免出現(xiàn)鎖已過期,但邏輯還沒執(zhí)行完成,這樣就失去了鎖的意義,當然這種情況下我們還可以考慮重入鎖。
最后推薦一下微軟開源的一個基于 Actor 模型的分布式框架?Orleans,也可以達到分布式鎖的效果。
參考鏈接
Distributed locks with Redis
RedLock.net
原文地址:http://beckjin.com/2019/01/06/redLock-net/
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結
以上是生活随笔為你收集整理的RedLock 实现分布式锁的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于Kebernetes 构建.NET
- 下一篇: [翻译] NumSharp的数组切片功能