日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

eShopOnContainers 知多少[5]:EventBus With RabbitMQ

發(fā)布時間:2023/12/4 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 eShopOnContainers 知多少[5]:EventBus With RabbitMQ 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1. 引言

事件總線這個概念對你來說可能很陌生,但提到觀察者(發(fā)布-訂閱)模式,你也許就很熟悉。事件總線是對發(fā)布-訂閱模式的一種實現(xiàn)。它是一種集中式事件處理機制,允許不同的組件之間進行彼此通信而又不需要相互依賴,達到一種解耦的目的。從上圖可知,核心就4個角色:

  • 事件(事件源+事件處理)

  • 事件發(fā)布者

  • 事件訂閱者

  • 事件總線

  • 實現(xiàn)事件總線的關鍵是:

  • 事件總線維護一個事件源與事件處理的映射字典;

  • 通過單例模式,確保事件總線的唯一入口;

  • 利用反射完成事件源與事件處理的初始化綁定;

  • 提供統(tǒng)一的事件注冊、取消注冊和觸發(fā)接口。

  • 以上源于我在事件總線知多少(1)中對于EventBus的分析和簡單總結。基于以上的簡單認知,我們來梳理下eShopOnContainers中EventBus的實現(xiàn)機制。

    2. 高屋建瓴--看類圖

    我們直接以上帝視角,來看下其實現(xiàn)機制,上類圖。

    我們知道事件的本質是:事件源+事件處理。 針對事件源,其定義了 Handle方法用于響應事件。不同之處在于方法參數(shù)的類型: 第一個接受的是一個強類型的 dynamic。 為什么要單獨提供一個事件源為 dynamic可以簡化事件源的構建,更趨于靈活。

    有了事件源和事件處理,接下來就是事件的注冊和訂閱了。為了方便進行訂閱管理,系統(tǒng)提供了額外的一層抽象 InMemoryEventBusSubscriptionsManager就是使用內存進行存儲事件源和事件處理的映射字典。 從類圖中看 SubscriptionInfo,其主要用于表示事件訂閱方的訂閱類型和事件處理的類型。

    我們來近距離看下

  • //InMemoryEventBusSubscriptionsManager.cs

  • //定義的事件名稱和事件訂閱的字典映射(1:N)

  • private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;

  • //保存所有的事件處理類型

  • private readonly List<Type> _eventTypes;

  • //定義事件移除后事件

  • public event EventHandler<string> OnEventRemoved;


  • //構造函數(shù)初始化

  • public InMemoryEventBusSubscriptionsManager()

  • {

  • ? ?_handlers = new Dictionary<string, List<SubscriptionInfo>>();

  • ? ?_eventTypes = new List<Type>();

  • }

  • //添加動態(tài)類型事件訂閱(需要手動指定事件名稱)

  • public void AddDynamicSubscription<TH>(string eventName)

  • ? ?where TH : IDynamicIntegrationEventHandler

  • {

  • ? ?DoAddSubscription(typeof(TH), eventName, isDynamic: true);

  • }

  • //添加強類型事件訂閱(事件名稱為事件源類型)

  • public void AddSubscription<T, TH>()

  • ? ?where T : IntegrationEvent

  • ? ?where TH : IIntegrationEventHandler<T>

  • {

  • ? ?var eventName = GetEventKey<T>();


  • ? ?DoAddSubscription(typeof(TH), eventName, isDynamic: false);


  • ? ?if (!_eventTypes.Contains(typeof(T)))

  • ? ?{

  • ? ? ? ?_eventTypes.Add(typeof(T));

  • ? ?}

  • }

  • //移除動態(tài)類型事件訂閱

  • public void RemoveDynamicSubscription<TH>(string eventName)

  • ? ?where TH : IDynamicIntegrationEventHandler

  • {

  • ? ?var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);

  • ? ?DoRemoveHandler(eventName, handlerToRemove);

  • }


  • //移除強類型事件訂閱

  • public void RemoveSubscription<T, TH>()

  • ? ?where TH : IIntegrationEventHandler<T>

  • ? ?where T : IntegrationEvent

  • {

  • ? ?var handlerToRemove = FindSubscriptionToRemove<T, TH>();

  • ? ?var eventName = GetEventKey<T>();

  • ? ?DoRemoveHandler(eventName, handlerToRemove);

  • }

  • 添加了這么一層抽象,即符合了單一職責原則,又完成了代碼重用。 IEventBusSubscriptionsManager的依賴,即可完成訂閱管理。 你這里可能會好奇,為什么要暴露一個 EventBusRabbitMQ源碼親密接觸。

    3.3.1. 構造函數(shù)定義

    IRabbitMQPersistentConnection

    以便連接到對應的Broke。


    • 使用空對象模式注入?OnEventRemoved事件,取消隊列的綁定。(這也就回答了上面遺留的問題)

    3.3.2. 事件訂閱的邏輯:

  • public void Publish(IntegrationEvent @event)

  • {

  • ? ?if (!_persistentConnection.IsConnected)

  • ? ?{

  • ? ? ? ?_persistentConnection.TryConnect();

  • ? ?}


  • ? ?var policy = RetryPolicy.Handle<BrokerUnreachableException>()

  • ? ? ? ?.Or<SocketException>()

  • ? ? ? ?.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>

  • ? ? ? ?{

  • ? ? ? ? ? ?_logger.LogWarning(ex.ToString());

  • ? ? ? ?});


  • ? ?using (var channel = _persistentConnection.CreateModel())

  • ? ?{

  • ? ? ? ?var eventName = @event.GetType()

  • ? ? ? ? ? ?.Name;


  • ? ? ? ?channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");


  • ? ? ? ?var message = JsonConvert.SerializeObject(@event);

  • ? ? ? ?var body = Encoding.UTF8.GetBytes(message);


  • ? ? ? ?policy.Execute(() =>

  • ? ? ? ?{

  • ? ? ? ? ? ?var properties = channel.CreateBasicProperties();

  • ? ? ? ? ? ?properties.DeliveryMode = 2; // persistent


  • ? ? ? ? ? ?channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body);

  • ? ? ? ?});

  • ? ?}

  • }

  • 這里面有以下幾個知識點:

  • 使用Polly,以2的階乘的時間間隔進行重試。(第一次2s后,第二次4s后,第三次8s后...重試)

  • 使用direct全匹配、單播形式的路由機制進行消息分發(fā)

  • 消息主體是格式化的json字符串

  • 指定?mandatory:true告知服務器當根據(jù)指定的routingKey和消息找不到對應的隊列時,直接返回消息給生產者。

  • 3.3.4. 然后看看事件消息的監(jiān)聽

    Received事件委托處理消息接收事件

    調用?

    以上代碼主要包括以下知識點:

    4. EventBus的集成和使用

    以上介紹了EventBus的實現(xiàn)要點,那各個微服務是如何集成呢?

    1. 注冊

    2. 注冊單例模式的 services.AddSingleton<IEventBusSubscriptionsManager,InMemoryEventBusSubscriptionsManager>();

    3. 注冊單例模式的

    完成了以上集成,就可以在代碼中使用事件總線進行事件的發(fā)布和訂閱。

    4. 發(fā)布事件

    若要發(fā)布事件,需要根據(jù)是否需要事件源(參數(shù)傳遞)來決定是否需要申明相應的集成事件,需要則繼承自 IEventBus的實例的

    IIntegrationEventHandler

    IEventBus的實例調用

    TestEvent事件,B服務訂閱該事件,同樣需要在B服務復制定義一個 <code class="prettyprint code-in-text prettyprinted" style="box-sizing: border-box;background: rgb(243, 241, 241);color: rgb(88, 88, 88);line-height: 18px;font-family: consolas, menlo, courier, monospace, " initial="" microsoft="" !important;"="" 0px="">TestEvent


    。 這也是微服務的一個通病,重復代碼。


    5. 最后

    通過一步一步的源碼梳理,我們發(fā)現(xiàn)eShopOnContainers中事件總線的總體實現(xiàn)思路與引言部分的介紹十分契合。所以對于事件總線,不要覺得高深,明確參與的幾個角色以及基本的實現(xiàn)步驟,那么不管是基于RabbitMQ實現(xiàn)也好還是基于Azure Service Bus也好,萬變不離其宗!




  • //定義事件處理

  • public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>

  • {

  • ? ?public async Task Handle(ProductPriceChangedIntegrationEvent @event)

  • ? ?{

  • ? ? ? ?//do something

  • ? ?}

  • }

  • //事件源的聲明

  • public class ProductPriceChangedIntegrationEvent : IntegrationEvent

  • { ? ? ? ?

  • ? ?public int ProductId { get; private set; }


  • ? ?public decimal NewPrice { get; private set; }


  • ? ?public decimal OldPrice { get; private set; }


  • ? ?public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)

  • ? ?{

  • ? ? ? ?ProductId = productId;

  • ? ? ? ?NewPrice = newPrice;

  • ? ? ? ?OldPrice = oldPrice;

  • ? ?}

  • }

  • services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>

  • {

  • ? ?var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();

  • ? ?var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();

  • ? ?var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();

  • ? ?var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();


  • ? ?var retryCount = 5;

  • ? ?if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))

  • ? ?{

  • ? ? ? ?retryCount = int.Parse(Configuration["EventBusRetryCount"]);

  • ? ?}


  • ? ?return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);

  • });

  • services.AddSingleton<IRabbitMQPersistentConnection>(sp =>

  • {

  • ? ?var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();

  • ? ?//...

  • ? ?return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);

  • });

  • Json字符串的反序列化

  • 利用依賴注入容器解析集成事件(Integration Event)和事件處理(Event Handler)類型

  • 反射調用具體的事件處理方法

  • private async Task ProcessEvent(string eventName, string message)

  • {

  • ? ?if (_subsManager.HasSubscriptionsForEvent(eventName))

  • ? ?{

  • ? ? ? ?using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))

  • ? ? ? ?{

  • ? ? ? ? ? ?var subscriptions = _subsManager.GetHandlersForEvent(eventName);

  • ? ? ? ? ? ?foreach (var subscription in subscriptions)

  • ? ? ? ? ? ?{

  • ? ? ? ? ? ? ? ?if (subscription.IsDynamic)

  • ? ? ? ? ? ? ? ?{

  • ? ? ? ? ? ? ? ? ? ?var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;

  • ? ? ? ? ? ? ? ? ? ?dynamic eventData = JObject.Parse(message);

  • ? ? ? ? ? ? ? ? ? ?await handler.Handle(eventData);

  • ? ? ? ? ? ? ? ?}

  • ? ? ? ? ? ? ? ?else

  • ? ? ? ? ? ? ? ?{

  • ? ? ? ? ? ? ? ? ? ?var eventType = _subsManager.GetEventTypeByName(eventName);

  • ? ? ? ? ? ? ? ? ? ?var integrationEvent = JsonConvert.DeserializeObject(message, eventType);

  • ? ? ? ? ? ? ? ? ? ?var handler = scope.ResolveOptional(subscription.HandlerType);

  • ? ? ? ? ? ? ? ? ? ?var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);

  • ? ? ? ? ? ? ? ? ? ?await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });

  • ? ? ? ? ? ? ? ?}

  • ? ? ? ? ? ?}

  • ? ? ? ?}

  • ? ?}

  • }

  • 總結

    以上是生活随笔為你收集整理的eShopOnContainers 知多少[5]:EventBus With RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。