eShopOnContainers 知多少[6]:持久化事件日志
1. 引言
事件總線解決了微服務間如何基于集成事件進行異步通信的問題。然而只有事件總線正常運行,微服務之間基于事件的通信才得以運轉。 而現實情況是,總有這樣或那樣的問題,導致事件總線不穩定或不可用,比如:網絡中斷,系統斷電等等,這都可能導致微服務間的不一致性問題。 那如何解決事件總線故障導致的不一致問題呢?
事件溯源
事件日志挖掘
發件箱模式
2. 問題
既然上面提到了一致性問題,那具體的問題是什么呢,在什么情況才會發生呢?我想我有必要簡單舉例。上代碼:
var oldPrice = item.Price;
item.Price = product.Price;
_context.CatalogItems.Update(item);
var @event = new ProductPriceChangedIntegrationEvent(item.Id, item.Price, oldPrice);
// Commit changes in original transaction
await _context.SaveChangesAsync();
// Publish integration event to the event bus
// (RabbitMQ or a service bus underneath)
_eventBus.Publish(@event);
當產品價格更改后,代碼將數據提交給數據庫,然后發布 ProductPriceChangedIntegrationEvent 事件。 如果服務在數據庫更新后崩潰(奔潰發生在 _context.SaveChangesAsync()代碼執行之后,但又發生在集成事件成功發布前),就會導致本地微服務價格已成功更新,但集成事件未發布的問題。就會導致目錄微服務中定義的價格和顧客購物車中緩存的價格不一致。
3. 分析問題
以上問題的關鍵在于是如何確保兩個獨立的操作的原子性。如果單從單體應用的角度來處理的話,我們完全是可以將他們放到同一個事務中去保證。然而在微服務中,就違背了其高可用的基本要求。因為一旦事件總線處于癱瘓狀態,那么整個目錄微服務就不可用了。這種強制通過事務保證的一致性,就引入了太多的問題依賴。
如果從微服務的角度來看,每個微服務負責各自的業務邏輯,對于目錄微服務來說,它的關注點是產品的更新是否成功。至于借助事件總線通過異步事件實現微服務間的通信,并不是其關注點。這也就是關注點分離。換句話說,產品的更新不應該依賴外部狀態。在這里,外部狀態就是事件總線的可用性。
你可能會說了,既然不允許通過強事務保證一致性,那么如何解決一致性問題呢(好像繞了半天又回到了原點)?
這里就要引入強一致性和最終一致性的概念了。強一致性:也就是事務一致性,將多個操作放到單一事務處理。要么全部成功,要么全部失敗。最終一致性:通過將某些操作的執行延遲到稍后的時間來執行。若前面的操作執行成功,后續操作將延后執行。若前面的操作失敗,后續的操作就不會執行。
到這里,我們實際要解決的問題就明確了:如何確保事件總線能夠正確進行事件轉發?
換句話說:事件總線掛了,但是事件消息不能丟失。只要事件消息不丟,后面我們還有機會挽救(重新發布消息)。
如何保證事件消息不丟失呢?當然是持久化了。
4. 持久化事件源
eShopOnContainers已經考慮了這一點,集成了事件日志用于持久化。我們直接來看類圖:從類圖中看其實現邏輯也很簡單,主要是定義了一個 IntegrationEventLogEntry實體、 EventStateEnum事件狀態枚舉和 IntegrationEventLogContextEF上下文用于事件日志持久化。暴露 IIntegrationEventLogService用于事件狀態的更新。
其他微服務通過在啟動類中注冊 IntegrationEventLogContext即可完成事件日志的集成。
services.AddDbContext<IntegrationEventLogContext>(options =>
{
? ?options.UseSqlServer(configuration["ConnectionString"],
? ? ? ?sqlServerOptionsAction: sqlOptions =>
? ? ? ?{
? ? ? ? ? ?sqlOptions.MigrationsAssembly(typeof(Startup)
? ? ? ? ? ? ? ?.GetTypeInfo().Assembly.GetName().Name);
? ? ? ? ? ?sqlOptions.EnableRetryOnFailure(maxRetryCount: 10,
? ? ? ? ? ? ? ?maxRetryDelay: TimeSpan.FromSeconds(30),
? ? ? ? ? ? ? ?errorNumbersToAdd: null);
? ? ? ?});
});
使用EF進行數據庫遷移后,就會生成 IntergrationEventLog表。如下圖所示:
5. 如何借助事件日志確保高可用
主要分兩步走:
應用程序開始本地數據庫事務,然后更新領域實體狀態,并將集成事件插入集成事件日志表中,最后提交事務來確保領域實體更新和保存事件日志所需的原子性。
發布事件
第一步毋庸置疑,第二步發布事件,我們又有多種實現方式:
在提交事務后立即發布集成事件,并將其標記為已發布。當微服務發生故障時,可以通過遍歷存儲的集成事件(未發布)執行補救措施。
將事件日志表用作一種隊列。使用單獨的線程或進程查詢事件日志表,將事件發布到事件總 線,然后將事件標記為已發布。
這里很顯然第二種方式更為穩妥。而eShopOnContainers出于簡單考慮,采用了第一種方案,具體代碼如下:
using (var transaction = _catalogContext.Database.BeginTransaction())
{
_catalogContext.CatalogItems.Update(catalogItem);
await _catalogContext.SaveChangesAsync();
// Save to EventLog only if product price changed
if(raiseProductPriceChangedEvent)
await
_integrationEventLogService.SaveEventAsync(priceChangedEvent);
transaction.Commit();
}
// Publish the intergation event through the event bus
_eventBus.Publish(priceChangedEvent);
integrationEventLogService.MarkEventAsPublishedAsync( priceChangedEvent);
至此,eShopOnContainers確保事件總線能夠正確轉發消息的解決方案闡述完畢。你可能會問,這對應的是引言中的哪一種方案?都不是,你可以看作其是基于事件日志的簡化版的事件溯源。
6. 僅此而已?
通過持久化事件日志來避免事件發布失敗導致的一致性問題,是一種有效措施。然而消息從發送到接收再到正常消費的過程中,每一個環節都可能故障,所以僅僅在消息發送端使用事件日志只是確保最終一致性的一小步。還有很多問題有待完善:
消息發送成功了,但未被成功接收
消息發送且成功接收,但未被正確消費
消息重復發送,導致多次消費問題
消息被多個微服務訂閱,如何確保每個微服務都成功接收并消費
等等
而這些問題就留給大家思考吧。
總結
以上是生活随笔為你收集整理的eShopOnContainers 知多少[6]:持久化事件日志的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eShopOnContainers 知多
- 下一篇: IdentityServer4与ocel