Dapr + .NET 实战(五)Actor
什么是Actor模式
Actors 為最低級(jí)別的“計(jì)算單元”
以上解釋來(lái)自官方文檔,看起來(lái)“晦澀難懂”。大白話就是說(shuō)Actors模式是一段需要單線程執(zhí)行的代碼塊。
實(shí)際開(kāi)發(fā)中我們經(jīng)常會(huì)有一些邏輯不能并發(fā)執(zhí)行,我們常用的做法就是加鎖,例如:
lock(obj) {//dosomething... }或者用Redis等中間件,為分布式應(yīng)用加一些分布式鎖。遺憾的是,使用顯式鎖定機(jī)制容易出錯(cuò)。它們很容易導(dǎo)致死鎖,并可能對(duì)性能產(chǎn)生嚴(yán)重影響。Actors模式為單線程邏輯提供了一種更好的選擇。
什么時(shí)候用Actors
需要單線程執(zhí)行,比如需要加lock
邏輯可以被劃分為小的執(zhí)行單元
工作原理
Dapr啟動(dòng)app時(shí),Sidecar調(diào)用Actors獲取配置信息,之后Sidecar將Actors的信息發(fā)送到安置服務(wù)(Placement Service),安置服務(wù)會(huì)將不同的Actor類型根據(jù)其Id和Actor類型分區(qū),并將Actor信息廣播到所有dapr實(shí)例。
?在客戶端調(diào)用某個(gè)Actor時(shí),安置服務(wù)會(huì)根據(jù)其Id和Actor類型,找到其所在的dapr實(shí)例,并執(zhí)行其方法。
調(diào)用Actors方法
POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method><actorType>:執(zhí)行組件類型。
<actorId>:要調(diào)用的特定參與者的 ID。
<method>:要調(diào)用的方法
計(jì)時(shí)器Timers和提醒器Reminders
Actor可以設(shè)置timer和reminder設(shè)置執(zhí)行Actor的時(shí)間,有點(diǎn)像我們常用的定時(shí)任務(wù)。但是timer和reminder也存在不同。
timer只作用于激活狀態(tài)的Actor。一個(gè)Actor長(zhǎng)期不被調(diào)用,其自己的空閑計(jì)時(shí)器會(huì)逐漸累積,到一定時(shí)間后會(huì)被Dapr銷毀,timer沒(méi)法作用于已銷毀的Actor。
reminder則可以作用于所有狀態(tài)的Actor。主要方式是重置空閑計(jì)時(shí)器,使其處于活躍狀態(tài)
操作timer
POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>到期時(shí)間(due time)表示注冊(cè)后 timer 將首次觸發(fā)的時(shí)間。?period?表示timer在此之后觸發(fā)的頻率。到期時(shí)間為0表示立即執(zhí)行。負(fù) due times 和負(fù) periods 都是無(wú)效。
下面的請(qǐng)求體配置了一個(gè) timer,?dueTime?9秒,?period?3秒。這意味著它將在9秒后首次觸發(fā),然后每3秒觸發(fā)一次。
{"dueTime":"0h0m9s0ms","period":"0h0m3s0ms" }下面的請(qǐng)求體配置了一個(gè) timer,?dueTime?0秒,?period?3秒。這意味著它將在注冊(cè)之后立即觸發(fā),然后每3秒觸發(fā)一次。
{"dueTime":"0h0m0s0ms","period":"0h0m3s0ms" }操作reminder
POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>?到期時(shí)間(due time)表示注冊(cè)后 reminders將首次觸發(fā)的時(shí)間。?period?表示在此之后 reminders 將觸發(fā)的頻率。到期時(shí)間為0表示立即執(zhí)行。負(fù) due times 和負(fù) periods 都是無(wú)效。若要注冊(cè)僅觸發(fā)一次的 reminders ,請(qǐng)將 period 設(shè)置為空字符串。
下面的請(qǐng)求體配置了一個(gè) reminders,?dueTime?9秒,?period?3秒。這意味著它將在9秒后首次觸發(fā),然后每3秒觸發(fā)一次。
{"dueTime":"0h0m9s0ms","period":"0h0m3s0ms" }下面的請(qǐng)求體配置了一個(gè) reminders,?dueTime?0秒,?period?3秒。這意味著它將在注冊(cè)之后立即觸發(fā),然后每3秒觸發(fā)一次。
{"dueTime":"0h0m0s0ms","period":"0h0m3s0ms" }下面的請(qǐng)求體配置了一個(gè) reminders,?dueTime?15秒,?period?空字符串。這意味著它將在15秒后首次觸發(fā),之后就不再被觸發(fā)。
{"dueTime":"0h0m15s0ms","period":"" }數(shù)據(jù)持久化
使用 Dapr?狀態(tài)管理構(gòu)建塊保存執(zhí)行組件狀態(tài)。由于執(zhí)行組件可以一輪執(zhí)行多個(gè)狀態(tài)操作,因此狀態(tài)存儲(chǔ)組件必須支持多項(xiàng)事務(wù)。撰寫(xiě)本文時(shí),以下?tīng)顟B(tài)存儲(chǔ)支持多項(xiàng)事務(wù):
Azure Cosmos DB
MongoDB
MySQL
PostgreSQL
Redis
RethinkDB
SQL Server
若要配置要與執(zhí)行組件一起使用的狀態(tài)存儲(chǔ)組件,需要將以下元數(shù)據(jù)附加到狀態(tài)存儲(chǔ)配置:
- name: actorStateStorevalue: "true"win10自承載模式下已默認(rèn)設(shè)置此項(xiàng) C:\Users\<username>\.dapr\components\statestore.yaml
項(xiàng)目實(shí)例
Actor操作
下面將通過(guò)一個(gè)審核流程的例子來(lái)演示。
還是用前面的FrontEnd項(xiàng)目,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore。
?定義IOrderStatusActor接口,需要繼承自IActor
using Dapr.Actors;using System.Threading.Tasks;namespace FrontEnd.ActorDefine {public interface IOrderStatusActor : IActor{Task<string> Paid(string orderId);Task<string> GetStatus(string orderId);} }定義OrderStatusActor實(shí)現(xiàn)IOrderStatusActor,并繼承自Actor
using Dapr.Actors.Runtime;using System.Threading.Tasks;namespace FrontEnd.ActorDefine {public class OrderStatusActor : Actor, IOrderStatusActor{public OrderStatusActor(ActorHost host) : base(host){}public async Task<string> Paid(string orderId){// change order status to paidawait StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");return orderId;}public async Task<string> GetStatus(string orderId){return await StateManager.GetStateAsync<string>(orderId);}} }需要注意的是,執(zhí)行組件方法的返回類型必須為 Task 或 Task<T>?。此外,執(zhí)行組件方法最多只能有一個(gè)參數(shù)。返回類型和參數(shù)都必須可 System.Text.Json 序列化。
Actor的api是必需的,因?yàn)?Dapr 挎斗調(diào)用應(yīng)用程序來(lái)承載和與執(zhí)行組件實(shí)例進(jìn)行交互,所以在Startup的Configure中配置
app.UseEndpoints(endpoints =>{endpoints.MapActorsHandlers();// .......});Startup類是用于注冊(cè)特定執(zhí)行組件類型的位置。在ConfigureServices?注冊(cè)?services.AddActors?:
services.AddActors(options =>{options.Actors.RegisterActor<OrderStatusActor>();});為測(cè)試這個(gè)Actor,需要定義一個(gè)接口調(diào)用,新增ActorController
using Dapr.Actors; using Dapr.Actors.Client;using FrontEnd.ActorDefine;using Microsoft.AspNetCore.Mvc;using System.Threading.Tasks;namespace FrontEnd.Controllers {[Route("[controller]")][ApiController]public class ActorController : ControllerBase{[HttpGet("paid/{orderId}")]public async Task<ActionResult> PaidAsync(string orderId){var actorId = new ActorId("myid-"+orderId);var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");var result = await proxy.Paid(orderId);return Ok(result);}} }ActorProxy.Create?為創(chuàng)建代理實(shí)例。?Create方法采用兩個(gè)參數(shù):標(biāo)識(shí)特定執(zhí)行組件和執(zhí)行組件?ActorId?類型。它還具有一個(gè)泛型類型參數(shù),用于指定執(zhí)行組件類型所實(shí)現(xiàn)的執(zhí)行組件接口。由于服務(wù)器和客戶端應(yīng)用程序都需要使用執(zhí)行組件接口,它們通常存儲(chǔ)在單獨(dú)的共享項(xiàng)目中。
啟動(dòng)FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll下面通過(guò)postman測(cè)試下,調(diào)用成功
?查看redis中的數(shù)據(jù)
127.0.0.1:6379> keys *1) "test_topic"2) "frontend||guid"3) "frontend||name"5) "newOrder"6) "frontend||OrderStatusActor||myid-123||123"7) "myapp2||key2"8) "myapp2||key1"9) "deathStarStatus" 10) "myapp||name" 127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123 1) "data" 2) "\"init\"" 3) "version" 4) "1"可以發(fā)現(xiàn)actor數(shù)據(jù)的命名規(guī)則是appName||ActorName||ActorId||key
同樣可以使用注入的方式創(chuàng)建proxy,ActorController中注入IActorProxyFactory
private readonly IActorProxyFactory _actorProxyFactory;public ActorController(IActorProxyFactory actorProxyFactory){_actorProxyFactory = actorProxyFactory;}新增獲取數(shù)據(jù)接口
[HttpGet("get/{orderId}")]public async Task<ActionResult> GetAsync(string orderId){var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(new ActorId("myid-" + orderId),"OrderStatusActor");return Ok(await proxy.GetStatus(orderId));}重新啟動(dòng)FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dllpostman測(cè)試
?Timer操作
使用Actor基類的?RegisterTimerAsync?方法計(jì)劃計(jì)時(shí)器。在OrderStatusActor類中新增方法
public Task StartTimerAsync(string name, string text){return RegisterTimerAsync(name,nameof(TimerCallbackAsync),Encoding.UTF8.GetBytes(text),TimeSpan.Zero,TimeSpan.FromSeconds(3));}public Task TimerCallbackAsync(byte[] state){var text = Encoding.UTF8.GetString(state);_logger.LogInformation($"Timer fired: {text}");return Task.CompletedTask;}StartTimerAsync方法調(diào)用?RegisterTimerAsync?來(lái)計(jì)劃計(jì)時(shí)器。?RegisterTimerAsync?采用五個(gè)參數(shù):
計(jì)時(shí)器的名稱。
觸發(fā)計(jì)時(shí)器時(shí)要調(diào)用的方法的名稱。
要傳遞給回調(diào)方法的狀態(tài)。
首次調(diào)用回調(diào)方法之前要等待的時(shí)間。
回調(diào)方法調(diào)用之間的時(shí)間間隔。可以指定 以?TimeSpan.FromMilliseconds(-1)?禁用定期信號(hào)。
在OrderStatusActor構(gòu)造方法中調(diào)用StartTimerAsync
StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();重新啟動(dòng)FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll通過(guò)調(diào)用paid接口實(shí)例化一個(gè)Actor,即可開(kāi)啟timer
?查看控制臺(tái),timer觸發(fā)成功
== APP == info: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == Timer fired: this is a test timerTimerCallbackAsync方法以二進(jìn)制形式接收用戶狀態(tài)。在示例中,回調(diào)在將狀態(tài)寫(xiě)入日志之前將狀態(tài)?string?解碼回 。
可以通過(guò)調(diào)用 來(lái)停止計(jì)時(shí)器?UnregisterTimerAsync?:
public Task StopTimerAsync(string name){return UnregisterTimerAsync(name);}Reminder操作
使用Actor基類的 RegisterReminderAsync 方法計(jì)劃計(jì)時(shí)器。在OrderStatusActor類中新增方法
public Task SetReminderAsync(string text){return RegisterReminderAsync("test-reminder",Encoding.UTF8.GetBytes(text),TimeSpan.Zero,TimeSpan.FromSeconds(1));}public Task ReceiveReminderAsync(string reminderName, byte[] state,TimeSpan dueTime, TimeSpan period){if (reminderName == "test-reminder"){var text = Encoding.UTF8.GetString(state);Logger.LogWarning($"reminder fired: {text}");}return Task.CompletedTask;}RegisterReminderAsync方法類似于?RegisterTimerAsync?,但不必顯式指定回調(diào)方法。如上面的示例所示,實(shí)現(xiàn)?IRemindable.ReceiveReminderAsync?以處理觸發(fā)的提醒。
public class OrderStatusActor : Actor, IOrderStatusActor, IRemindableReceiveReminderAsync觸發(fā)提醒時(shí)調(diào)用 方法。它采用 4 個(gè)參數(shù):
提醒的名稱。
注冊(cè)期間提供的用戶狀態(tài)。
注冊(cè)期間提供的調(diào)用到期時(shí)間。
注冊(cè)期間提供的調(diào)用周期。
在OrderStatusActor構(gòu)造方法中調(diào)用SetReminderAsync
SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();重新啟動(dòng)FrontEnd
dapr run --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll通過(guò)調(diào)用paid接口實(shí)例化一個(gè)Actor,即可開(kāi)啟reminder
?查看控制臺(tái),reminder觸發(fā)成功
== APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0] == APP == reminder fired: this is a test reminder 相關(guān)文章:Dapr實(shí)戰(zhàn)(一) 基礎(chǔ)概念與環(huán)境搭建
Dapr + .NET Core實(shí)戰(zhàn)(二) 服務(wù)調(diào)用
Dapr + .NET Core實(shí)戰(zhàn)(三)狀態(tài)管理
Dapr + .NET 實(shí)戰(zhàn)(四)發(fā)布和訂閱
總結(jié)
以上是生活随笔為你收集整理的Dapr + .NET 实战(五)Actor的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 如何优化 .NET Core 中的 la
- 下一篇: .NET 排序 Array.SortT