通过Dapr实现一个简单的基于.net的微服务电商系统(十九)——分布式事务之Saga模式...
目錄:
一、通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)
二、通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(二)——通訊框架講解
三、通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(三)——一步一步教你如何擼Dapr
四、通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(四)——一步一步教你如何擼Dapr之訂閱發(fā)布
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(五)——一步一步教你如何擼Dapr之狀態(tài)管理
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(六)——一步一步教你如何擼Dapr之Actor服務(wù)
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(七)——一步一步教你如何擼Dapr之服務(wù)限流
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(八)——一步一步教你如何擼Dapr之鏈路追蹤
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(九)——一步一步教你如何擼Dapr之OAuth2授權(quán)
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(九)——一步一步教你如何擼Dapr之OAuth2授權(quán)-百度版
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十)——一步一步教你如何擼Dapr之綁定
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十一)——一步一步教你如何擼Dapr之自動擴/縮容
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十二)——istio+dapr構(gòu)建多運行時服務(wù)網(wǎng)格
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十三)——istio+dapr構(gòu)建多運行時服務(wù)網(wǎng)格之生產(chǎn)環(huán)境部署
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十四)——開發(fā)環(huán)境容器調(diào)試小技巧
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十五)——集中式接口文檔實現(xiàn)
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十六)——dapr+sentinel中間件實現(xiàn)服務(wù)保護(hù)
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十七)——服務(wù)保護(hù)之動態(tài)配置與熱重載
通過Dapr實現(xiàn)一個簡單的基于.net的微服務(wù)電商系統(tǒng)(十八)——服務(wù)保護(hù)之多級緩存
附錄:(如果你覺得對你有用,請給個star)
一、電商Demo地址:https://github.com/sd797994/Oxygen-Dapr.EshopSample
二、通訊框架地址:https://github.com/sd797994/Oxygen-Dapr
三、Saga框架地址:https://github.com/sd797994/Oxygen-Saga
一、什么是Saga
? ? ? ? 在領(lǐng)域驅(qū)動設(shè)計中,由于領(lǐng)域邊界的存在,以往的分層設(shè)計中業(yè)務(wù)會按照其固有的領(lǐng)域知識被切分到不同的限界中,并且引入了領(lǐng)域事件這一概念來降低單個業(yè)務(wù)的復(fù)雜度,通過非耦合的事件驅(qū)動來完成復(fù)雜的業(yè)務(wù)。但是事件驅(qū)動帶來了一些新的問題,由于以往一個原子性極強的邏輯被拆散到了一個一個小的領(lǐng)域中,原子性事務(wù)數(shù)據(jù)的強一致性無法被保證。為了解決這個問題,一般會采用事務(wù)補償?shù)姆绞絹泶_保最終一致。
? ? ? ? 當(dāng)我們采用多個本地事務(wù)組合去進(jìn)行業(yè)務(wù)處理時,由于業(yè)務(wù)其本身的復(fù)雜性,往往需要在多個事務(wù)中協(xié)調(diào)。而事件協(xié)調(diào)器(saga)就是一個專門降低其復(fù)雜度的設(shè)計,開發(fā)人員原則上只需要將事件和補償按照一定順序注冊到協(xié)調(diào)處理器中,原則上協(xié)調(diào)器會按照注冊的事件依次執(zhí)行,若出現(xiàn)事件執(zhí)行失敗時,也會按照補償列表進(jìn)行相應(yīng)的回滾。
? ? ? ? 去年曾經(jīng)在一篇文章里聊過Saga,鏈接在此。相比去年的那個小DEMO。我在此基礎(chǔ)上基于Dapr的狀態(tài)管理完成了另外一個開源項目Oxygen-Saga,地址:https://github.com/sd797994/Oxygen-Saga。當(dāng)然這個項目既可以引入到dapr的環(huán)境中完成Saga事務(wù),本身也可以獨立的基于rabbitmq+redis來完成非Dapr環(huán)境下的Saga事務(wù)。今天主要講解一下如何在Dapr環(huán)境下引入Saga來實現(xiàn)一個分布式事務(wù)。
首先我們還是看看目前的一個訂單下單邏輯,可以看到基于Actor服務(wù),我們的請求是在訂單服務(wù)里直連商品服務(wù)Actor完成庫存減扣的,并通過Actor的周期性持久化機制完成事務(wù)落庫,所以實際上是把分布式事務(wù)的復(fù)雜性通過Actor屏蔽掉了,并沒有涉及到真正的分布式事務(wù)。
從代碼層面也可以看到,整個事務(wù)其實是在同一個方法里以同步調(diào)用Actor的方式完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #region 私有遠(yuǎn)程服務(wù)包裝器方法 async Task<List<OrderGoodsSnapshot>> GetGoodsListByIds(IEnumerable<Guid> input) { ????return?(await goodsQueryService.GetGoodsListByIds(new?GetGoodsListByIdsDto(input))).GetData<List<OrderGoodsSnapshot>>(); } async Task<bool> DeductionGoodsStock(CreateOrderDeductionGoodsStockDto input) { ????var?data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>(); ????data.ActorId = input.GoodsId.ToString(); ????return?(await goodsActorService.DeductionGoodsStock(data)).GetData<bool>(); } async Task<bool> UnDeductionGoodsStock(CreateOrderDeductionGoodsStockDto input) { ????var?data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>(); ????data.ActorId = input.GoodsId.ToString(); ????return?(await goodsActorService.UnDeductionGoodsStock(data)).GetData<bool>(); } #endregion |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public?async Task<ApiResult> CreateOrder(OrderCreateDto input) { ????var?mockUser = (await accountQueryService.GetMockAccount()).GetData<CurrentUser>(); ????//申明一個創(chuàng)建訂單領(lǐng)域服務(wù)實例,將遠(yuǎn)程rpc調(diào)用作為匿名函數(shù)傳遞進(jìn)去 ????var?createOrderService =?new?CreateOrderService(GetGoodsListByIds, DeductionGoodsStock, UnDeductionGoodsStock); ????return?await ApiResult.Ok("訂單創(chuàng)建成功!").RunAsync(async () => ????{ ????????var?order = await createOrderService.CreateOrder(mockUser.Id, mockUser.UserName, mockUser.Address, mockUser.Tel, input.Items.CopyTo<OrderCreateDto.OrderCreateItemDto, OrderItem>().ToList());//通過訂單服務(wù)創(chuàng)建訂單 ????????repository.Add(order); ????????if?(await?new?CheckOrderCanCreateSpecification(repository).IsSatisfiedBy(order)) ????????????await unitofWork.CommitAsync(); ????????await eventBus.SendEvent(EventTopicDictionary.Order.CreateOrderSucc,?new?OperateOrderSuccessEvent(order, mockUser.UserName));//發(fā)送訂單創(chuàng)建成功事件 ????}, ????//失敗回滾 ????createOrderService.UnCreateOrder); } |
而在Saga方案里就變得不一樣了,我們需要借助Saga的流程管理器在多個實例中流轉(zhuǎn)我們的事務(wù),通過事件訂閱和發(fā)布來最終完成一個分布式事務(wù)。具體的調(diào)用流程如下:
可以看到整個流程完全依賴于SagaManger來提供對應(yīng)的調(diào)用策略,而作為開發(fā)人員只需要為每個策略提供對應(yīng)的委托函數(shù),對應(yīng)的具體流程如下:
一、首先是訂單服務(wù)和商品服務(wù)在系統(tǒng)初始化時需要注冊對應(yīng)的配置文件到本地Saga管理器,并且需要提供對應(yīng)的流程處理委托(包括業(yè)務(wù)事件委托+補償事件委托+異常處理委托)。
二、在客戶發(fā)起一個訂單時,只需要創(chuàng)建一個Saga流程實例,剩下的就交給Saga,它會自動幫我們流轉(zhuǎn)整個業(yè)務(wù)邏輯而無需人工插手。
二、talk is cheap, show me the code
首先我們需要為整個訂單創(chuàng)建流程設(shè)計一組Topic:
接著我們創(chuàng)建一個Saga配置注冊這組topic進(jìn)去,流程比較簡單,第一步是扣除庫存,下一步是創(chuàng)建訂單,補償事件只有一個就是庫存回滾:
再然后我們需要創(chuàng)建這些Topic對應(yīng)的事件處理器:
訂單服務(wù):
商品服務(wù):?
接著我們在各自的服務(wù)里去實現(xiàn)它們:
商品服務(wù):
訂單服務(wù):
然后我們在訂單用例里創(chuàng)建一個Action用于啟動saga流程(注意是第二個方法):
?最后我們在商品和訂單服務(wù)中引入saga組件并注冊事件和異?;卣{(diào)委托(注冊代碼相似,此處僅展示其中一個服務(wù)的):
接著我們修改m站的創(chuàng)建訂單接口,修改為新的Saga流程接口,然后編譯整個項目,啟動并測試一下:
?
?可以看到整個流程被順利的通過Saga管理器流轉(zhuǎn)完畢。接著我們嘗試注入一個故障,看看能否正常的被異常訂閱器捕獲到:
?編譯后再測試一下,我們可以看到事件異常訂閱器里已經(jīng)成功捕獲到了這個異常:
?接著我們在創(chuàng)建訂單注入一個異常,讓商品庫存進(jìn)行回滾操作,這里由于下單后異常商品補償所以在界面上是體現(xiàn)不出來的(當(dāng)然在真實的業(yè)務(wù)場景中一般是下單5秒等待后查詢訂單創(chuàng)建情況,或發(fā)送一條站內(nèi)信告知下單失敗),這里我們通過打印控制臺來演示:
接著我們運行并下單,追蹤商品服務(wù)的日志:
可以看到由于訂單創(chuàng)建失敗,saga觸發(fā)了補償事件并成功執(zhí)行了補償。好了,關(guān)于saga的演示就到這里,可嘗試?yán)∽钚碌纳坛窃创a執(zhí)行即可看到效果。
總結(jié)
以上是生活随笔為你收集整理的通过Dapr实现一个简单的基于.net的微服务电商系统(十九)——分布式事务之Saga模式...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BeetleX服务网关流量控制
- 下一篇: Windows 11 上大招!正式支持安