通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务...
我個人認為Actor應該是Dapr里比較重頭的部分也是Dapr一直在講的所謂“stateful applications”真正具體的一個實現(個人認為),上一章講到有狀態服務可能很多同學看到后的第一反應是“不就是個分布式緩存嗎”。那今天就講講Actor,看看這個東西到底能不能算得上有狀態服務,同時由于篇幅有限,這里只會快速的過一遍Actor相關的概念,著重還是代碼層面的實現。
目錄:
一、通過Dapr實現一個簡單的基于.net的微服務電商系統
二、通過Dapr實現一個簡單的基于.net的微服務電商系統(二)——通訊框架講解
三、通過Dapr實現一個簡單的基于.net的微服務電商系統(三)——一步一步教你如何擼Dapr
四、通過Dapr實現一個簡單的基于.net的微服務電商系統(四)——一步一步教你如何擼Dapr之訂閱發布
通過Dapr實現一個簡單的基于.net的微服務電商系統(五)——一步一步教你如何擼Dapr之狀態管理
附錄:(如果你覺得對你有用,請給個star)
一、電商Demo地址:https://github.com/sd797994/Oxygen-Dapr.EshopSample
二、通訊框架地址:https://github.com/sd797994/Oxygen-Dapr
最早我接觸到Actor應該是微軟的Orleans框架(熟悉Actor或者Orleans的同學這一大段可以直接跳過),百度Actor關鍵詞一大堆“通用并發編程模型”可能讓人云里霧里的,其實它并不是一個特別復雜的概念。什么是并發編程?這個概念大家應該很熟悉了,現在主流的web服務器(如.netcore的kestrel或者dotnetty)幾乎都是支持并行訪問的,通過線程池充分調度操作系統的多線程來并行完成任務。在傳統的多線程模式中如果多個線程同時訪問某個數據并對其進行非冪等操作,往往是線程不安全的。
在單應用時代我們可以很方便的通過lock關鍵字或者semaphore信號量或者concurrent線程安全集合或者Interlocked這樣的CAS原子操作去規避多線程訪問導致的數據不安全,亦或者直接采用以數據庫事務為基礎的樂觀 or 悲觀事務來實現,而一旦我們的應用由于吞吐瓶頸需要以集群的方式部署時或者分布式部署后對數據庫也進行了拆分后,上面的那些方案都會失效或者會導致高昂的成本(比如數據庫分布式事務協調機制)。這個時候往往需要引入一些分布式組件比如zookeeper或者redis鎖來解決。這也是分布式系統比較常用的數據一致性方案。而actor則是提出了一個新的在分布式環境下解決多線程污染數據的思路。
actor概念相對比較復雜這里就不展開了,簡單粗暴的來理解就是在內存里為每一個actor對象維護了一個消息隊列,當任意的請求不管該請求是來自于其他進程的線程亦或是當前進程的線程,都會將請求寫入該消息隊列,而Actor對象會監聽該隊列,當收到消息后Actor會處理該請求,在請求處理期間,外部線程會被阻塞在消息隊列中,并且新的請求也會入隊等待,直到actor對象完成操作后從隊列里取出下一個請求處理直到整個隊列為空。同時每一個actor對象在其臨界區內的內存是私有的,并不會被其他線程共享,從而就實現了內存安全。這樣當我們客戶端發起數個請求訪問一個或多個Actor對象時每個請求都會進入對應的Actor對象的消息隊列(術語叫Mailboxs)并等待actor消費。同時Dapr框架會確保同一個Actor對象在同一時間在整個分布式系統中只會被激活一個實例!從而確保了你無論從分布式系統的任意角落訪問某個Actor對象(user?id=1),總能得到唯一的一個實例
Dapr框架會確保你的Actor實例永遠能夠被訪問到(正確激活),哪怕對象在長時間未被訪問后系統回收休眠亦或者在未處理的異常導致其崩潰后
正確使用Actor唯一的要求就只有一條,由于Actor是一個內存并發模型所以不要在并發訪問Actor時去做任意的可能的IO阻塞(比如讀取數據庫)!
開始擼碼,首先我們做一個RPC服務,看看多線程訪問下的數據會是什么個情況,再對比一下Actor模式!在RPC層我們創建一個接口,代表產品服務,其有兩個方法對應讀取產品以及減扣庫存
? 接著我們在servicesample層實現一下這個服務(這里直接創建一個靜態變量模擬多線程下訪問共享內存數據的場景)
? 接著我們在clientsample發起對著兩個服務的RPC調用
? 現在我們通過并發測試統計jmter對其進行并發測試,并發1000個線程去減100個庫存,最后我們通過postman去訪問get方法看看結果是什么
?
減庫存前
?
?
?
?并行訪問1000次
可以看到由于沒有并發控制,我們的庫存被扣負了。現在我們開始對其進行Actor改造。首先我們將接口繼承iactorservice并申明服務的方法為actor(這一步的目的是為類型生成actor代理)
[RemoteService("servicesample", "product")]public interface IProductService : IActorService{[RemoteFunc(FuncType.Actor)]Task<ProductOutput> Get(ProductInput input);[RemoteFunc(FuncType.Actor)]Task<ProductOutput> ReduceStock(ProductInput input);}接著我們讓入參類繼承一個基類,這個基類需要派生類重寫其Actorid字段。原因是Actor是通過全局唯一標識符在內部被標識的,訪問相同標識會被路由到同一個actor。
public class ProductInput : ActorSendDto{public int PorductId { get; set; }public int ReduceStock { get; set; }public override string ActorId { get; set; }}接下來我們改造一下clientsample的調用方法,這里修改的部分不多,只是把代理生成的方式替換了一下
public async Task<dynamic> GetProduct(){var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();return await actorService.Get(new ProductInput() { ActorId = "1", PorductId = 1 });}public async Task<dynamic> ProductReduceStock(){var actorService = serviceProxyFactory.CreateActorProxy<IProductService>();return await actorService.ReduceStock(new ProductInput() { ActorId = "1", PorductId = 1, ReduceStock = 1 });}接著我們對servicesample進行改造,首先我們需要在hostbuilder里替換掉默認的OxygenStartup,OxygenActorStartup會幫我們掃描類型生成對應的actor代理(其他代碼無變化,略)
.ConfigureWebHostDefaults(webhostbuilder => {//注冊成為oxygen服務節點webhostbuilder.StartOxygenServer<OxygenActorStartup>((config) => {config.Port = 80;config.PubSubCompentName = "pubsub";config.StateStoreCompentName = "statestore";config.TracingHeaders = "Authentication";});})接著我們需要將之前的商品持久化PO類繼承一個基類ActorStateModel,該基類會強制派生類重寫兩個屬性AutoSave和ReminderSeconds,前者代表是否自動持久化(調用Actor SDK的Statemanage持久化到中間件,第二個代表如果開啟持久化,是瞬時持久化還是由Actor的Timer按照周期持久化,這里的設計有點類似于redis aof模式下的always和everysec,前者(ReminderSeconds=0)采用每一次變更同步一次,性能損耗較大,后者采用每n(取決于ReminderSeconds設置)秒通過timer異步同步一次,同時我在Actor代理中添加了版本管理,并不會導致你的ReminderSeconds設置了周期同步后到時間就會請求你的同步委托,而是檢測到版本變化后才會請求),這里我測試就直接開啟自動同步并使用always模式
public class ProductPo : ActorStateModel{public int Id { get; set; }public string Name { get; set; }public int Stock { get; set; }public override bool AutoSave { get; set; } = true;public override int ReminderSeconds => 0;}最后我們對ProductService進行改造,如下:
public class ProductService : BaseActorService<ProductPo>, IProductService{static int visitCount = 0;static ProductPo ProductPoInstance;public async Task<ProductOutput> Get(ProductInput input){ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };return new ProductOutput() { Message = $"第{visitCount}次請求成功,當前庫存剩余{ActorData.Stock}" };}public async Task<ProductOutput> ReduceStock(ProductInput input){Interlocked.Increment(ref visitCount);await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(20, 50));//模擬數據庫耗時ActorData ??= new ProductPo() { Id = 1, Name = "小白菜", Stock = 100 };if (ActorData.Stock >= input.ReduceStock){await Task.Delay(new Random(Guid.NewGuid().GetHashCode()).Next(50, 100));//模擬數據庫耗時ActorData.Stock -= input.ReduceStock;}return new ProductOutput() { Message = $"第{visitCount}次請求成功,當前庫存剩余{ActorData.Stock}" };}public override async Task SaveData(ProductPo model, ILifetimeScope scope){Console.WriteLine("同步請求被調用了,此處可以進行數據庫持久化!");await Task.CompletedTask;}}可以看到我的服務繼承了一個基類BaseActorService,并需要傳遞一個類型為ActorStateModel的泛型,這樣在我的服務里不再通過IO去拉取ProductPoInstance,而是直接使用ActorData這個泛型實例進行各種操作即可,所以我刪除掉了對應的數據庫模擬耗時(避免actor隊列訪問阻塞),最后你必須重寫BaseActorService的SaveData方法,該方法就是上文提到的同步委托,當我們開啟AutoSave時,ReminderSeconds=0會在actor被調用操作完成后激活該委托,ReminderSeconds>0時會被定時器定期根據actor對比版本后判斷是否需要激活。同時無論哪種方式我都在actor代理內部維護了一個channel異步隊列通過異步訂閱發布的方式實現非阻塞式的actor持久化而不用擔心持久化導致的io阻塞問題。SaveData入參返回的一個ILifetimeScope容器可以很方便的獲取到你的repository或者直接獲取ef的上下文進行對應的數據庫持久化操作(這里需要注意一下,Actor持久化有兩層意思,第一層意思是Actor sdk會自帶一個StateManager,當Component開啟actor支持后,可以通過StateManager將actor對象寫入中間件,而這里提供的SaveData是我封裝的一個通過訂閱發布異步調用的委托,方便開發人員持久化到數據庫用的,非actor原生自帶的設計)。
最后我們需要擴展我們的Component,需要開啟Actor持久化支持,編輯文件后用kubectl apply -f x.yaml即可:
apiVersion: dapr.io/v1alpha1 kind: Component metadata:name: statestore spec:type: state.redisversion: v1metadata:- name: actorStateStorevalue: "true"- name: redisHostvalue: redis.infrastructure.svc.cluster.local:6379- name: keyPrefixvalue: none接下來我們看看通過jmter重新請求后的情況
?
可以看到Actor確實解決了并發訪問安全的問題,同時也能看到我們的委托被正確的調用了。
總結一下,Actor確實通過其特殊的設計模式解決了并發訪問數據安全的問題,同時也帶來了一些問題諸如需要特定框架支持,諸如Actor行為內不能阻塞等等限制,不過相比其帶來的無鎖對象訪問來講,這點限制都是可以克服的,至少在特定場景下比如搶票、發紅包等等有一定并發同時又需要確保數據一致的場景,Actor算是一個可選方案。至于更多的場景探索則需要同學們自己去摸索了,今天的分享就到這里。下期不出意外的話我們會分享一下Dapr的服務限流
相關文章:
Dapr能否引領云原生中間件的未來?
云原生 | 阿里巴巴的Dapr實踐與探索
Dapr | 云原生的抽象與實現
Dapr 可視化指南
Dapr 知多少 | 分布式應用運行時
Dapr 正式發布 1.0
Dapr 交通流量控制示例
Dapr是如何簡化微服務的開發和部署
微軟開源微服務運行時Dapr,賦能云原生應用開發
YARP實現Dapr服務調用的反向代理
Dapr微服務應用開發系列0:概述
Dapr微服務應用開發系列1:環境配置
Dapr微服務應用開發系列2:Hello World與SDK初接觸
Dapr微服務應用開發系列3:服務調用構件塊
Dapr微服務應用開發系列4:狀態管理構件塊
Dapr微服務應用開發系列5:發布訂閱構建塊
Windows環境下Dapr入門
云原生 | .NET 5 with Dapr 初體驗
通過Dapr實現一個簡單的基于.net的微服務電商系統
通過Dapr實現一個簡單的基于.net的微服務電商系統(二)——通訊框架講解
通過Dapr實現一個簡單的基于.net的微服務電商系統(三)——一步一步教你如何擼Dapr
通過Dapr實現一個簡單的基于.net的微服務電商系統(四)——一步一步教你如何擼Dapr之訂閱發布
通過Dapr實現一個簡單的基于.net的微服務電商系統(五)——一步一步教你如何擼Dapr之狀態管理
總結
以上是生活随笔為你收集整理的通过Dapr实现一个简单的基于.net的微服务电商系统(六)——一步一步教你如何撸Dapr之Actor服务...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET WebSocket 核心原理初
- 下一篇: java信息管理系统总结_java实现科