《ASP.NET Core 微服务实战》-- 读书笔记(第6章)
第 6 章 事件溯源與 CQRS
在本章,我們來了解一下隨著云平臺一同出現(xiàn)的設(shè)計模式
我們先探討事件溯源和命令查詢職責(zé)分離(CQRS)背后的動機與哲學(xué)
事件溯源簡介
事實由事件溯源而來
我們大腦就是一種事件溯源系統(tǒng),接收感官多種形式刺激,大腦負(fù)責(zé)對這些刺激進行合適排序,大約每隔幾百毫秒,對刺激構(gòu)成的流進行運算,而運算的結(jié)果,就是我們所說的事實
事件溯源的定義
傳統(tǒng)應(yīng)用中,狀態(tài)由一系列零散的數(shù)據(jù)所管理,如果客戶端向我們發(fā)送 PUT 或 POST 請求,狀態(tài)就會改變
這種方式很好地給出了系統(tǒng)當(dāng)前狀態(tài),卻不能指示在當(dāng)前狀態(tài)之前,系統(tǒng)是如何變化的
事件溯源可以解決這個問題,因為它把狀態(tài)管理的職責(zé)與接收導(dǎo)致狀態(tài)變更的刺激的職責(zé)區(qū)分開來
基于事件溯源的系統(tǒng)需要滿足一系列要求
有序:有序事件流
冪等:等價多個有序事件流的操作結(jié)果相同
獨立:不依賴外部信息
過去式:事件發(fā)生在過去
流行的區(qū)塊鏈技術(shù)的基礎(chǔ)就是發(fā)生在特定私有資源上的安全、可信的事件序列
擁抱最終一致性
一種我們每天都在用的最終一致性的應(yīng)用,就是社區(qū)網(wǎng)絡(luò)應(yīng)用
有時你從一個設(shè)備發(fā)出的評論要花幾分鐘才能展示在朋友的瀏覽器或者其他設(shè)備上
這是因為,應(yīng)用的架構(gòu)人員做了妥協(xié):通過放棄同步操作的即時一致性,在可接受的范圍內(nèi)增加一定的反饋延遲,就能讓應(yīng)用支持巨大的規(guī)模與流量
CQRS 模式
如果把我們討論的模式直接套用到系統(tǒng)中,很快會發(fā)現(xiàn)系統(tǒng)必須對輸入命令和查詢加以區(qū)分,這也被稱為命令查詢職責(zé)分離(CQRS)
我們用一個例子來說明這種模式的實際應(yīng)用
租戶通過一個門戶網(wǎng)站查看用電情況,每當(dāng)用戶刷新門戶頁面時,就調(diào)用某種數(shù)據(jù)服務(wù)并請求,匯總一段時間內(nèi)所有度量事件
但這種對于云規(guī)模的現(xiàn)代軟件開發(fā)來說是不可接受的,如果將計算職責(zé)推卸給數(shù)據(jù)庫,很快會造成數(shù)據(jù)庫瓶頸
掌握了大多數(shù)客戶的使用模式,讓我們能夠利用事件溯源來構(gòu)建一個合理的 CQRS 實現(xiàn)。
事件處理器每次收到新事件時重新計算已緩存的度量總和
利用這種機制,在查詢時,門戶上的用戶所期望的結(jié)果已經(jīng)存在于數(shù)據(jù)庫或者緩存中
不需要復(fù)制的計算,也沒有臨時的聚合與繁雜的匯總,只需要一個簡單的查詢
事件溯源于 CQRS 實戰(zhàn)--附件的團隊成員
接下來要開發(fā)的新版實例中,我們將檢測成員彼此相距一個較小距離的時刻
系統(tǒng)將支持對這些接近的結(jié)果予以響應(yīng)
例如我們可能希望向附近的團隊成員的移動設(shè)備發(fā)送推送通知,以提醒他們可以約見對方
為了實現(xiàn)這一功能,我們把系統(tǒng)職責(zé)劃分為以下四個組件:
位置報送服務(wù)(命令)
事件處理器(對事件進行溯源)
事實服務(wù)(查詢)
位置接近監(jiān)控器(對事件進行溯源)
位置報送服務(wù)
收到新報送的位置后,執(zhí)行下列操作:
驗證上報數(shù)據(jù)
將命令轉(zhuǎn)換為事件
生成事件,并用消息隊列發(fā)送出去
GitHub 鏈接:https://github.com/microservices-aspnetcore/es-locationreporter
創(chuàng)建位置報送控制器
using System; using Microsoft.AspNetCore.Mvc; using StatlerWaldorfCorp.LocationReporter.Events; using StatlerWaldorfCorp.LocationReporter.Models; using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter.Controllers {[Route("/api/members/{memberId}/locationreports")]public class LocationReportsController : Controller{private ICommandEventConverter converter;private IEventEmitter eventEmitter;private ITeamServiceClient teamServiceClient;public LocationReportsController(ICommandEventConverter converter,IEventEmitter eventEmitter,ITeamServiceClient teamServiceClient) {this.converter = converter;this.eventEmitter = eventEmitter;this.teamServiceClient = teamServiceClient;}[HttpPost]public ActionResult PostLocationReport(Guid memberId, [FromBody]LocationReport locationReport){MemberLocationRecordedEvent locationRecordedEvent = converter.CommandToEvent(locationReport);locationRecordedEvent.TeamID = teamServiceClient.GetTeamForMember(locationReport.MemberID);eventEmitter.EmitLocationRecordedEvent(locationRecordedEvent);return this.Created($"/api/members/{memberId}/locationreports/{locationReport.ReportID}", locationReport);}} }創(chuàng)建 AMQP 事件生成器
using System; using System.Linq; using System.Text; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Events {public class AMQPEventEmitter : IEventEmitter{private readonly ILogger logger;private AMQPOptions rabbitOptions;private ConnectionFactory connectionFactory;public AMQPEventEmitter(ILogger<AMQPEventEmitter> logger,IOptions<AMQPOptions> amqpOptions){this.logger = logger;this.rabbitOptions = amqpOptions.Value;connectionFactory = new ConnectionFactory();connectionFactory.UserName = rabbitOptions.Username;connectionFactory.Password = rabbitOptions.Password;connectionFactory.VirtualHost = rabbitOptions.VirtualHost;connectionFactory.HostName = rabbitOptions.HostName;connectionFactory.Uri = rabbitOptions.Uri;logger.LogInformation("AMQP Event Emitter configured with URI {0}", rabbitOptions.Uri);}public const string QUEUE_LOCATIONRECORDED = "memberlocationrecorded";public void EmitLocationRecordedEvent(MemberLocationRecordedEvent locationRecordedEvent){using (IConnection conn = connectionFactory.CreateConnection()) {using (IModel channel = conn.CreateModel()) {channel.QueueDeclare(queue: QUEUE_LOCATIONRECORDED,durable: false,exclusive: false,autoDelete: false,arguments: null);string jsonPayload = locationRecordedEvent.toJson();var body = Encoding.UTF8.GetBytes(jsonPayload);channel.BasicPublish(exchange: "",routingKey: QUEUE_LOCATIONRECORDED,basicProperties: null,body: body);}}}} }配置并啟動服務(wù)
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using System; using Microsoft.Extensions.Logging; using System.Linq; using StatlerWaldorfCorp.LocationReporter.Models; using StatlerWaldorfCorp.LocationReporter.Events; using StatlerWaldorfCorp.LocationReporter.Services;namespace StatlerWaldorfCorp.LocationReporter {public class Startup{public Startup(IHostingEnvironment env, ILoggerFactory loggerFactory){loggerFactory.AddConsole();loggerFactory.AddDebug();var builder = new ConfigurationBuilder().SetBasePath(env.ContentRootPath).AddJsonFile("appsettings.json", optional: false, reloadOnChange: false).AddEnvironmentVariables();Configuration = builder.Build();}public IConfigurationRoot Configuration { get; }public void ConfigureServices(IServiceCollection services){services.AddMvc();services.AddOptions();services.Configure<AMQPOptions>(Configuration.GetSection("amqp"));services.Configure<TeamServiceOptions>(Configuration.GetSection("teamservice"));services.AddSingleton(typeof(IEventEmitter), typeof(AMQPEventEmitter));services.AddSingleton(typeof(ICommandEventConverter), typeof(CommandEventConverter));services.AddSingleton(typeof(ITeamServiceClient), typeof(HttpTeamServiceClient));}public void Configure(IApplicationBuilder app,IHostingEnvironment env,ILoggerFactory loggerFactory,ITeamServiceClient teamServiceClient,IEventEmitter eventEmitter){// Asked for instances of singletons during Startup// to force initialization early.app.UseMvc();}} }對 Configure 的兩次調(diào)用讓配置子系統(tǒng)把分別從 amqp 和 teamservice 節(jié)加載的配置選項以依賴注入的方式提供出來
這些配置可以由 appsettings.json 文件提供,也可以用環(huán)境變量覆蓋
{"amqp": {"username": "guest","password": "guest","hostname": "localhost","uri": "amqp://localhost:5672/","virtualhost": "/"},"teamservice": {"url": "http://localhost:5001"} }消費團隊服務(wù)
using System; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using Newtonsoft.Json; using StatlerWaldorfCorp.LocationReporter.Models;namespace StatlerWaldorfCorp.LocationReporter.Services {public class HttpTeamServiceClient : ITeamServiceClient{private readonly ILogger logger;private HttpClient httpClient;public HttpTeamServiceClient(IOptions<TeamServiceOptions> serviceOptions,ILogger<HttpTeamServiceClient> logger){this.logger = logger;var url = serviceOptions.Value.Url;logger.LogInformation("Team Service HTTP client using URL {0}", url);httpClient = new HttpClient();httpClient.BaseAddress = new Uri(url);}public Guid GetTeamForMember(Guid memberId){httpClient.DefaultRequestHeaders.Accept.Clear();httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));HttpResponseMessage response = httpClient.GetAsync(String.Format("/members/{0}/team", memberId)).Result;TeamIDResponse teamIdResponse;if (response.IsSuccessStatusCode) {string json = response.Content.ReadAsStringAsync().Result;teamIdResponse = JsonConvert.DeserializeObject<TeamIDResponse>(json);return teamIdResponse.TeamID;}else {return Guid.Empty;}}}public class TeamIDResponse{public Guid TeamID { get; set; }} }這個例子中,我們使用 .Result 屬性在等待異步方法響應(yīng)期間強行阻塞了線程
在生產(chǎn)級質(zhì)量的代碼里,很可能對此進行重構(gòu),確保在服務(wù)邊界之內(nèi)整個調(diào)用鏈都傳遞異步結(jié)果
運行位置報送服務(wù)
RabbitMQ 已經(jīng)啟動運行,默認(rèn)的配置也指向了本地的 RabbitMQ 實例
此時可以使用以下方式啟動位置報送服務(wù)
(確保位于 src/StatlerWaldorfCorp.LocationReporter 子目錄中)
$ dotnet restore $ dotnet build $ dotnet run --server.urls=http://0.0.0.0:9090服務(wù)運行后,只要向服務(wù)提交請求,就可以體驗其功能了
$ curl -X POST -d \ '{"reportID":"...", \ "origin":"...", "latitude":10, "longtitude":20, \ "memberID":"..."}' \ http://...le2 \ /locationreports提交完成后,應(yīng)該能從服務(wù)獲得一個 HTTP 201 響應(yīng)
事件處理器
它的職責(zé)是消費來自流的事件,并執(zhí)行合適的操作
為確保代碼整潔、可測試,我們把事件處理的職責(zé)劃分為如下部分:
訂閱隊列并從事件流中獲取新的消息
將消息寫入事件存儲
處理事件流(檢測附近的隊友)
作為流的處理結(jié)果,生成新的消息并發(fā)送到隊列
作為流的處理結(jié)果,向事實服務(wù)的服務(wù)器 / 緩存提交狀態(tài)變更情況
GitHub 鏈接:https://github.com/microservices-aspnetcore/es-eventprocessor
檢測附近隊友的基于 GPS 工具類的檢測器
using System.Collections.Generic; using StatlerWaldorfCorp.EventProcessor.Location; using System.Linq; using System;namespace StatlerWaldorfCorp.EventProcessor.Events {public class ProximityDetector{/** This method assumes that the memberLocations collection only* applies to members applicable for proximity detection. In other words,* non-team-mates must be filtered out before using this method.* distance threshold is in Kilometers.*/public ICollection<ProximityDetectedEvent> DetectProximityEvents(MemberLocationRecordedEvent memberLocationEvent,ICollection<MemberLocation> memberLocations,double distanceThreshold){GpsUtility gpsUtility = new GpsUtility();GpsCoordinate sourceCoordinate = new GpsCoordinate() {Latitude = memberLocationEvent.Latitude,Longitude = memberLocationEvent.Longitude};return memberLocations.Where(ml => ml.MemberID != memberLocationEvent.MemberID &&gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location) < distanceThreshold).Select( ml => {return new ProximityDetectedEvent() {SourceMemberID = memberLocationEvent.MemberID,TargetMemberID = ml.MemberID,TeamID = memberLocationEvent.TeamID,DetectionTime = DateTime.UtcNow.Ticks,SourceMemberLocation = sourceCoordinate,TargetMemberLocation = ml.Location,MemberDistance = gpsUtility.DistanceBetweenPoints(sourceCoordinate, ml.Location)};}).ToList();}} }接著我們就可以用這個方法的結(jié)果來產(chǎn)生對應(yīng)的額外效果,例如可能需要發(fā)出一個 ProximityDetectorEvent 事件,并將事件寫入事件存儲
作為主體的事件處理器代碼
using System; using System.Collections.Generic; using Microsoft.Extensions.Logging; using StatlerWaldorfCorp.EventProcessor.Location; using StatlerWaldorfCorp.EventProcessor.Queues;namespace StatlerWaldorfCorp.EventProcessor.Events {public class MemberLocationEventProcessor : IEventProcessor{private ILogger logger;private IEventSubscriber subscriber;private IEventEmitter eventEmitter;private ProximityDetector proximityDetector;private ILocationCache locationCache;public MemberLocationEventProcessor(ILogger<MemberLocationEventProcessor> logger,IEventSubscriber eventSubscriber,IEventEmitter eventEmitter,ILocationCache locationCache){this.logger = logger;this.subscriber = eventSubscriber;this.eventEmitter = eventEmitter;this.proximityDetector = new ProximityDetector();this.locationCache = locationCache;this.subscriber.MemberLocationRecordedEventReceived += (mlre) => {var memberLocations = locationCache.GetMemberLocations(mlre.TeamID);ICollection<ProximityDetectedEvent> proximityEvents =proximityDetector.DetectProximityEvents(mlre, memberLocations, 30.0f);foreach (var proximityEvent in proximityEvents) {eventEmitter.EmitProximityDetectedEvent(proximityEvent);}locationCache.Put(mlre.TeamID, new MemberLocation { MemberID = mlre.MemberID, Location = new GpsCoordinate {Latitude = mlre.Latitude, Longitude = mlre.Longitude} });};}public void Start(){this.subscriber.Subscribe();}public void Stop(){this.subscriber.Unsubscribe();}} }事件處理服務(wù)唯一的額外職責(zé)是需要將收到的每個事件都寫入事件存儲
這樣做到原因有很多,包括向其他服務(wù)提供可供搜索的歷史記錄
如果緩存崩潰、數(shù)據(jù)丟失、事件存儲也可用于重建事實緩存
請記住,緩存在架構(gòu)里僅提供便利性,我們不應(yīng)該在緩存中存儲任何無法從其他位置重建的數(shù)據(jù)
我們要給服務(wù)里每一個團隊創(chuàng)建一個 Redis 哈希(hash)
在哈希中,把團隊成員的位置經(jīng)序列化得到的 JSON 正文存儲為字段(團隊成員的 ID 用作鍵)
這樣就能輕松地并發(fā)更新多個團隊成員地位置而不會覆蓋數(shù)據(jù),同時也很容易查詢給定的任意團隊的位置列表,因為團隊就是一個個哈希
事實服務(wù)
事實服務(wù)負(fù)責(zé)維護每個團隊成員的位置,不過這些位置只代表最近從一些應(yīng)用那里收到的位置
關(guān)于事實服務(wù)的這類服務(wù),有兩條重要的提醒需要記住:
事實服務(wù)并不是事件存儲
事實服務(wù)是不可依賴服務(wù)
位置接近監(jiān)控器
位置接近監(jiān)控器的代碼包括
基本的微服務(wù)結(jié)構(gòu)
一個隊列消費端,訂閱 ProximityDetectedEvent 事件到達的消息
調(diào)用一些第三方或云上的服務(wù)來發(fā)送推動通知
運行示例項目
下面列出運行本章示例的依賴項:
RabbitMQ 服務(wù)器
Redis 服務(wù)器
所有依賴項都啟動運行后,可從 GitHub 拉取 es-locationreporter 和 es-eventprocessor 兩個服務(wù)的代碼
此外需要一份 teamservice 服務(wù)
請確保獲取的是 master 分支,因為在測試期間只需要用到內(nèi)存存儲
要啟動團隊服務(wù),在命令行中轉(zhuǎn)到 src/StatlerWaldorfCorp.TeamService 目錄并運行以下命令
$ dotnet run --server.urls=http://0.0.0.:5001要啟動位置報送服務(wù),在命令行中轉(zhuǎn)到 src/StatlerWaldorfCorp.LocationReporter 目錄下并運行以下命令
$ dotnet run --server.urls=http://0.0.0:5002啟動事件處理器(從 src/StatlerWaldorfCorp.EventProcessor 目錄運行)
$ dotnet run --server.urls=http://0.0.0.:5003可用下列步驟端到端地檢驗整個事件溯源/CQRS系統(tǒng):
(1)向 http://localhost:5001/teams 發(fā)送一個 POST 請求,創(chuàng)建一個新團隊
(2)向 http://localhost:5001/teams/
/members 發(fā)送一個 POST 請求,往團隊中添加一個成員(3)向 http://localhost:5002/api/members/
/locationreports 發(fā)送一個 POST 請求,報送團隊成員位置(4)觀察由報送的位置轉(zhuǎn)換而成、被放到對應(yīng)隊列中的 MemberLocationReportedEvent 事件
(5)再重復(fù)幾次第 3 步,添加一些相距較遠的位置,確保不會觸發(fā)并被檢測到位置接近事件
(6)重復(fù)第 2 步,往第一名測試成員所在團隊添加一名新成員
(7)為第二名成員再次重復(fù)第 3 步,添加一個于第一名成員最近的位置相距幾公里以內(nèi)的位置
(8)現(xiàn)在應(yīng)該能夠在 proximitydetected 隊列中看到一條新消息
(9)可用直接查詢 Redis 緩存,也可以利用事實服務(wù)來查看各團隊成員最新的位置狀態(tài)
手動操作幾次后,大多數(shù)團隊會花些時間把這一過程自動化
借助 docker compose 之類的工具,或者創(chuàng)建 Kubernetes 部署,或者其他容器編排環(huán)境,可自動將所有服務(wù)部署到集成測試環(huán)境
接著用腳本發(fā)送 REST 請求
待測試運行完成后,斷言出現(xiàn)了正確的接近檢測的次數(shù),值也是正確的
總結(jié)
以上是生活随笔為你收集整理的《ASP.NET Core 微服务实战》-- 读书笔记(第6章)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#如何安全、高效地玩转任何种类的内存之
- 下一篇: asp.net ajax控件工具集 Au