日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于Abp VNext框架设计 - Masstransit分布式消息

發布時間:2023/12/4 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于Abp VNext框架设计 - Masstransit分布式消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

abp 通過IDistributedEventBus接口集成自IEventBus實現分布式事件消息的發布訂閱。

IEventBus在什么時機觸發PublishAsync?

  • 當前UnitOfWork完成時,觸發IEventBus的PublishAsync

  • 在沒有事務環境下,同步調用IEventBus的PublishAsync

  • abp 默認實現基于RabbitMq消息隊列Volo.Abp.EventBus.RabbitMQ實現分布式消息的發布與訂閱。

    消息治理核心問題:

  • 生產端如何保證投遞成功的消息不能丟失。

  • Mq自身如何保證消息不丟失。

  • 消費段如何保證消費端的消息不丟失。

  • 基于abp 默認實現的DistributedEventBus不能滿足以下場景:

  • Publisher 生產者無法保證消息一定能投遞到MQ。

  • Consumer 消費端在消息消費時,出現異常時,沒有異常錯誤處理機制(確保消費失敗的消息能重新被消費)。

  • 我們引入Masstransit,來提升abp對消息治理能力。

    Masstransit提供以下開箱即用功能:

  • Publish/Send/Request-Response等幾種消息投遞機制。

  • 多種IOC容器支持。

  • 異常機制。

  • Saga事務管理。

  • 事務活動補償機制(Courier)

  • 消息審計

  • 消息管道處理機制

  • Abp 框架下事件消息集成

  • 使用MassTransit重新實現IDistributedEventBus。

  • 在消費端Consumer傳遞用戶身份信息。

  • 使用Asp.Net Core Web Host 作消費端Consumer宿主。

  • 集成MassTransit

    在Module初始化時,注入MassTransit實例,并啟動。

    Copy/// <summary> /// 配置DistributedEventBus /// </summary> /// <param name="context"></param> /// <param name="configuration"></param> /// <param name="hostingEnvironment"></param> private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment) {var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();var mqConnectionString = "rabbitmq://" + options.ConnectionString;context.Services.AddMassTransit(mtConfig =>{//inject consumers into IOC from assemblymtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));mtConfig.AddBus(provider =>{var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>{var host = mqConfig.Host(new Uri(mqConnectionString), h =>{h.Username(options.UserName);h.Password(options.Password);});// set special message serializermqConfig.UseBsonSerializer();// integrated existed logger compontentmqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>{//set rabbitmq prefetch countq.PrefetchCount = 200;//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);});mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>{//set rabbitmq prefetch countq.PrefetchCount = 50;//q.UseRateLimit(100, TimeSpan.FromSeconds(1));//q.UseConcurrencyLimit(2);//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<UserSyncEventConsumer>(provider);EndpointConvention.Map<UserSyncEvent>(q.InputAddress);});mqConfig.ConfigureEndpoints(provider);mqConfig.UseAuditingFilter(provider, o =>{o.ReplaceAuditing = true;});});// set authtication middleware for user identitybus.ConnectAuthenticationObservers(provider);return bus;});}); }

    在MassTransit中,使用IBusControl接口?StartAsync?或?StopAsync?來啟動或停止。

    使用IPublishEndpoint重新實現IDistributedEventBus接口,實現與abp分布式事件總線集成。

    Copy public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency{private readonly IPublishEndpoint _publishEndpoint;//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }public MassTransitDistributedEventBus(IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,IPublishEndpoint publishEndpoint){//ServiceScopeFactory = serviceScopeFactory;_publishEndpoint = publishEndpoint;DistributedEventBusOptions = distributedEventBusOptions.Value;//Subscribe(distributedEventBusOptions.Value.Handlers);}/** Not Implementation*/public Task PublishAsync<TEvent>(TEvent eventData)where TEvent : class{return _publishEndpoint.Publish(eventData);}public Task PublishAsync(Type eventType, object eventData){return _publishEndpoint.Publish(eventData, eventType);}}

    到此,我們實現了MassTransit與Abp集成。

    事件消息傳遞User Claims

    在實際業務實現過程中,我們會用消息隊列實現“削峰填谷”的效果。異步消息隊列中傳遞用戶身份信息如何實現呢?

    我們先看看abp在WebApi中,如何確定當前用戶?

    ICurrentUser?提供當前User Claims抽象。而ICurrentUser依賴于ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 來記錄當前用戶身份。

    在MassTransit中,利用IPublishObserver?>?IConsumeObserver?生產者/消費端的觀察者,來實現傳遞已認證的用戶Claims。

    Copy /// <summary>/// 生產者傳遞當前用戶Principal/// </summary>public class AuthPublishObserver : IPublishObserver{private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;public AuthPublishObserver(ICurrentPrincipalAccessor currentPrincipalAccessor,IClaimsPrincipalFactory claimsPrincipalFactory){_currentPrincipalAccessor = currentPrincipalAccessor;_claimsPrincipalFactory = claimsPrincipalFactory;}public Task PrePublish<T>(PublishContext<T> context) where T : class{var claimsPrincipal = _claimsPrincipalFactory.CreateClaimsPrincipal(_currentPrincipalAccessor.Principal);if (claimsPrincipal != null){context.Headers.SetAuthenticationHeaders(claimsPrincipal);}return TaskUtil.Completed;}public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;} Copy/// <summary>/// 消費端從MqMessage Heads 中獲取當前用戶Principal,并賦值給HttpContext/// </summary>public class AuthConsumeObserver : IConsumeObserver{private readonly IHttpContextAccessor _httpContextAccessor;private readonly IServiceScopeFactory _factory;public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory){_httpContextAccessor = httpContextAccessor;_factory = factory;}public Task PreConsume<T>(ConsumeContext<T> context) where T : class{if (_httpContextAccessor.HttpContext == null){_httpContextAccessor.HttpContext = new DefaultHttpContext{RequestServices = _factory.CreateScope().ServiceProvider};}var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated){var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();_httpContextAccessor.HttpContext.User = claimsPrincipal;Thread.CurrentPrincipal = claimsPrincipal;}return TaskUtil.Completed;}public Task PostConsume<T>(ConsumeContext<T> context) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}}

    使用Asp.Net Core Web Host 作消費端Consumer宿主

    基于以下幾點原因,我們使用Asp.Net Core Web Host 作為消息端Consumer宿主

  • 部署在Linux環境下,Asp.Net Core Web Host 通常使用守護進程來啟動服務實例,這樣可以保證服務不被中斷。

  • 根據abp vnext DDD 項目分層,最大程度利用Application層應用方法,復用abp vnext 框架機制。

  • MassTransit 深入研究

  • 延遲消息

  • 限流、熔斷降級

  • 批量消費

  • Saga

  • References

  • abp vnext disctributed event bus

  • MassTransit

  • 總結

    以上是生活随笔為你收集整理的基于Abp VNext框架设计 - Masstransit分布式消息的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。