日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

将Abp默认事件总线改造为分布式事件总线

發布時間:2023/12/24 windows 26 coder
生活随笔 收集整理的這篇文章主要介紹了 将Abp默认事件总线改造为分布式事件总线 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

@

目錄
  • 原理
    • 創建分布式事件總線
    • 實現自動訂閱和事件轉發
  • 使用
    • 啟動Redis服務
    • 配置
    • 傳遞Abp默認事件
    • 傳遞自定義事件
  • 項目地址

原理

本地事件總線是通過Ioc容器來實現的。

IEventBus接口定義了事件總線的基本功能,如注冊事件、取消注冊事件、觸發事件等。

Abp.Events.Bus.EventBus是本地事件總線的實現類,其中私有成員ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件訂閱表。通過維護事件訂閱表來實現事件處理器的注冊和取消注冊。當對應類型的事件觸發時,通過訂閱表查找所有事件處理器,通過Ioc容器來獲取處理器實例,然后通過反射來調用事件處理器的"HandleEvent"方法。

創建分布式事件總線

首先,我們需要一個分布式事件總線中間件,用來將事件從本地事件總線轉發到分布式事件總線。常用的中間件有RabbitMQ、Kafka、Redis等。

開源社區已經有實現好的庫,本項目參考了 *6216/Abp.RemoteEventBus

這里已經定義好了一個分布式事件總線接口


public interface IDistributedEventBus : IDisposable
{
    void MessageHandle(string topic, string message);

    void Publish(IDistributedEventData eventData);

    void Subscribe(string topic);

    void Unsubscribe(string topic);

    void UnsubscribeAll();
}

為了兼容本地事件總線,我們需要定義一個分布式事件總線接口,繼承自IEventBus接口。


public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{

}


實現自動訂閱和事件轉發

當注冊本地事件時,將訂閱分布式事件,事件Topic為類型的字符串表現形式

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{
    GetOrCreateHandlerFactories(eventType);
    List<IEventHandlerFactory> currentLists;
    if (_handlerFactories.TryGetValue(eventType, out currentLists))
    {
        lock (currentLists)
        {
            if (currentLists.Count == 0)
            {
                //Register to distributed event
                this.Subscribe(eventType.ToString());
            }
            currentLists.Add(factory);
        }
    }
    return new FactoryUnregistrar(this, eventType, factory);
}

創建TriggerRemote,此方法用于將本地事件參數打包成為分布式事件消息payload,并發布該消息

public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{
    var exceptions = new List<Exception>();
    eventData.EventSource = eventSource;
    try
    {
        var payloadDictionary = new Dictionary<string, object>
                {
                    { PayloadKey, eventData }
                };
        var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);
        Publish(distributedeventData);
    }

    catch (Exception ex)
    {
        exceptions.Add(ex);
    }
    if (exceptions.Any())
    {
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}


當觸發本地事件時,將消息轉發至分布式事件總線。
在Trigger方法中調用TriggerRemote,事件狀態回調和事件異常回調將不會被轉發。

if (!(typeof(DistributedEventBusEvent) == eventType
   || typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)
   || typeof(DistributedEventMessageHandleExceptionData) == eventType
   || typeof(DistributedEventHandleExceptionData) == eventType
    ))
{
    if (typeof(DistributedEventArgs) != eventType)
    {
        TriggerRemote(eventType, eventSource, eventData);

    }
}

在消費端接收到分布式事件消息時,從Topic中解析類型,轉發給本地事件。若此類型在本地事件注冊過,則將消息反序列化為本地事件參數,然后觸發本地事件。
本地事件處理器將觸發最終的處理方法。


public virtual void MessageHandle(string topic, string message)
{
    Logger.Debug($"Receive message on topic {topic}");
    try
    {
        var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);
        var eventArgs = new DistributedEventArgs(eventData, topic, message);
        Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));

        if (!string.IsNullOrEmpty(eventData.Type))
        {
            string pattern = @"(.*?)\[(.*?)\]";
            Match match = Regex.Match(eventData.Type, pattern);
            if (match.Success)
            {

                var type = match.Groups[1].Value;
                var type2 = match.Groups[2].Value;

                var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();
                var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();

                if (localTriggerType != null && genericType != null)
                {

                    if (localTriggerType.GetTypeInfo().IsGenericType
                        && localTriggerType.GetGenericArguments().Length == 1
                        && !genericType.IsAbstract && !genericType.IsInterface
                        )
                    {
                        var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);


                        if (eventData.Data.TryGetValue(PayloadKey, out var payload))
                        {
                            var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);
                            Trigger(localTriggerGenericType, this, (IEventData)payloadObject);

                        }
                    }
                }


            }
            else
            {
                var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();
                if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface)
                {
                    if (eventData.Data.TryGetValue(PayloadKey, out var payload))
                    {
                        var payloadObject = (payload as JObject).ToObject(localTriggerType);
                        Trigger(localTriggerType, this, (IEventData)payloadObject);

                    }

                }
            }
            Trigger(this, new DistributedEventBusHandledEvent(eventArgs));

        }
    }
    catch (Exception ex)
    {
        Logger.Error("Consume remote message exception", ex);
        Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));
    }
}

使用

DistributedEventBus有不同的實現方式,這里以Redis為例

啟動Redis服務

下載Redis并啟動服務,使用默認端口6379

配置

生產者和消費者端都需要配置分布式事件總線

首先引用Abp.DistributedEventBus.Redis,并配置Abp模塊依賴

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis連接信息

 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>
 {
     setting.Server = "127.0.0.1:6379";
 });

用MultipleEventBus替換Abp默認事件總線

 //todo: 事件總線
 Configuration.ReplaceService(
  typeof(IEventBus),
  () => IocManager.IocContainer.Register(
      Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()
  ));

傳遞Abp默認事件

我們知道在使用倉儲時,Abp會自動觸發一些事件,如創建、更新、刪除等。我們來測試這些事件是否能通過分布式事件總線來傳遞。

定義一個實體類,用于傳遞實體的增刪改事件。


public class Person : FullAuditedEntity<int>
{

    public string Name { get; set; }
    public int Age { get; set; }
    public string PhoneNumber { get; set; }

}

在消費者端,定義一個事件處理器,用于處理實體的增刪改事件。


public class RemoteEntityChangedEventHandler :
    IEventHandler<EntityUpdatedEventData<Person>>,
    IEventHandler<EntityCreatedEventData<Person>>,
    IEventHandler<EntityDeletedEventData<Person>>,
    ITransientDependency
{

    void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
    }

    void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");

    }

    void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData)
    {
        var person = eventData.Entity;
        Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");

    }
}


在生產者端,用IRepository對實體進行增刪改操作。


var person = new Person()
{

    Name = "John",
    Age = 36,
    PhoneNumber = "18588888888"

};

personRepository.Insert(person);

var person2 = new Person()
{

    Name = "John2",
    Age = 36,
    PhoneNumber = "18588888889"

};
personRepository.Insert(person2);

var persons = personRepository.GetAllList();
foreach (var p in persons)
{
    p.Age += 1;
    personRepository.Update(p);
    Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");

}
foreach (var p in persons)
{
    personRepository.Delete(p);
    Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");

}


運行程序(同時運行消費者端和生產者端),可以看到消費者端打印出了實體的增刪改事件。

注意:

分布式事件總線在兩個獨立系統間傳遞事件,所以需要定義一個共同的類型對象,用于事件參數的傳遞。
因此消費者端需要引用生產者端的模塊,以便獲取共同的類型對象。

public override Assembly[] GetAdditionalAssemblies()
{
    var clientModuleAssembly = typeof(Person).GetAssembly();
    return [clientModuleAssembly];
}

傳遞自定義事件

定義NotificationEventData,用于傳遞自定義事件。


public class NotificationEventData : EventData
{
    public int Id { get; set; }
    
    public string Title { get; set; }

    public string Message { get; set; }

    public bool IsRead { get; set; }
}

在消費者端,定義一個事件處理器,用于處理自定義事件。

public class NotificationEventHandler :
    IEventHandler<NotificationEventData>,      
    ITransientDependency
{
    
    void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData)
    {
        Console.WriteLine($"Id: {eventData.Id}");
        Console.WriteLine($"Title: {eventData.Title}");
        Console.WriteLine($"Message: {eventData.Message}");
        Console.WriteLine($"IsRead: {eventData.IsRead}");

    }
}

在生產者端,觸發自定義事件。

var eventBus = IocManager.Instance.Resolve<IEventBus>();


eventBus.Trigger<NotificationEventData>(new NotificationEventData()
{
    Title = "Hi",
    Message = "Customized definition event test!",
    Id = 100,
    IsRead = true,
});

運行程序(同時運行消費者端和生產者端),可以看到消費者端打印出了自定義事件。

項目地址

Github:DistributedEventBus

總結

以上是生活随笔為你收集整理的将Abp默认事件总线改造为分布式事件总线的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。