Actor-ES框架:Ray-Handler-消息订阅器编写
消息訂閱器:
Ray是基于Event Sourcing設計的ES/Actor框架,消息發布后需要訂閱處理,訂閱器主要有以下兩類:
CoreHandler消息訂閱器=RabbitSub+SubHandler
ToReadHandler消息訂閱器=RabbitSub+SQLToReadHandler(ToReadHandler的子類)
RabbitSub特性
RabbitSub特性是RabbitMQ消息隊列訂閱器。
RabbitSub特性有兩個構造函數,常用的是這個:
public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)group:通常用于分類。示例中,X-CoreHandler的group是Core,X-ToReadHandler是Read。
exchange:RabbitMQ中的exchange名稱。
queue:RabbitMQ中的queue名稱。
queueCount:消息隊列數。用于消息的負載均衡。
示例:
[RabbitSub("Core", "Account", "account")] public sealed class AccountCoreHandler : SubHandler<string, MessageInfo> {…… }RabbitSub可以單獨使用,用于訂閱消息。
CoreHandler消息訂閱器
Ray中的ESActor通過RaiseEvent方法發布事件,傳遞消息。Ray默認使用RabbitMQ傳遞消息。ESActor發起事件后,CoreHandler訂閱事件,以處理事件。
實現方式是:
繼承SubHandler。
添加RabbitSub特性。 exchange名稱、queue名稱與ESGrain上RabbitPub特性的標識一致。
添加構造函數(必須)。
public AccountCoreHandler(IServiceProvider svProvider) : base(svProvider) { }事件被訂閱后會流轉到Tell方法中,data是要處理的事件。
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg) {switch (data){case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));default: return task;} }ToReadHandler消息訂閱器
SQLToReadHandler
ESActor發起事件后,X-ToReadHandler訂閱事件,以處理事件。X-ToReadHandler繼承自X-SQLToReadHandler,X-SQLToReadHandler繼承自ToReadHandler。
示例圖:
X-SQLToReadHandler需要使用者繼承PartSubHandler,根據使用的關系型數據庫自己實現。Ray默認提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他關系型數據庫,請自定義實現。
X-SQLToReadHandler實現細節:
修改對應關系型數據庫的Integrity Constraint Violation(違反完整性約束)的異常。
可以將實例中PSQLToReadHandler當做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))即可。
說明:
當X-ToReadHandler訂閱消息,消息有重放的場景,如果該消息已經得到處理,數據庫中已經存在其處理后的結果,這是可能會報Integrity Constraint Violation(違反完整性約束)異常,默認不做處理,其他異常將其拋出,這是這段代碼的作用。
示例模板:
public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo> {public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider){ }public override Task Notice(byte[] data){return base.Notice(data).ContinueWith(t =>{if (t.Exception != null){//根據使用數據庫,修改這個if判斷if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505")){throw t.Exception;}}});} }? 2. X-ToReadHandler
X-ToReadHandler訂閱器主要用于訂閱感興趣的消息,將數據寫入到數據庫中。
實現方式是:
實現SQLToReadHandler
ToReadHandler繼承SQLToReadHandler(ToReadHandler的子類)
添加RabbitSub特性。
添加構造函數(必須),在構造函數中注冊關注的事件。
public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider) {Register<AmountAddEvent>();Register<AmountTransferEvent>(); }代碼如下所示:
X-ToReadHandler消息訂閱器與CoreHandler消息訂閱器差異
X-ToReadHandler消息訂閱器使用時,需要在構造函數中注冊關心的事件,而X-CoreHandler中不需要,原因是事件在處理中需要反序列化,X-CoreHandler會對RabbitSub參數指定訂閱的所有的消息反序列化,X-ToReadHandler在此基礎上做了進一步的控制,在訂閱的消息中只對Register的事件處理。這樣做的原因:1.反序列化會消耗一定的性能,進一步控制有助于提高性能;2.Ray提供兩種實現方式,為開發者擴展自定義源碼提供借鑒。
總結:
CoreHandler消息訂閱器=RabbitSub+SubHandler
ToReadHandler消息訂閱器=RabbitSub+SQLToReadHandler(ToReadHandler的子類)
相關文章:
Actor-ES框架:Ray
Actor-ES框架:Ray--事件(Event)編寫說明
Ray框架Q&A
Actor-ES框架:Ray-Handler之CoreHandler編寫
Actor-ES框架:Ray-Handler之ToReadHandler編寫
原文地址:http://www.cnblogs.com/CharlesZHENG/p/8425856.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的Actor-ES框架:Ray-Handler-消息订阅器编写的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dotnetcore+vue+eleme
- 下一篇: Actor-ES框架:Actor编写-E