基于Abp VNext框架设计 - Masstransit分布式消息
abp 通過IDistributedEventBus接口集成自IEventBus實現分布式事件消息的發布訂閱。
IEventBus在什么時機觸發PublishAsync?
當前UnitOfWork完成時,觸發IEventBus的PublishAsync
在沒有事務環境下,同步調用IEventBus的PublishAsync
abp 默認實現基于RabbitMq消息隊列Volo.Abp.EventBus.RabbitMQ實現分布式消息的發布與訂閱。
消息治理核心問題:
生產端如何保證投遞成功的消息不能丟失。
Mq自身如何保證消息不丟失。
消費段如何保證消費端的消息不丟失。
基于abp 默認實現的DistributedEventBus不能滿足以下場景:
Publisher 生產者無法保證消息一定能投遞到MQ。
Consumer 消費端在消息消費時,出現異常時,沒有異常錯誤處理機制(確保消費失敗的消息能重新被消費)。
我們引入Masstransit,來提升abp對消息治理能力。
Masstransit提供以下開箱即用功能:
Publish/Send/Request-Response等幾種消息投遞機制。
多種IOC容器支持。
異常機制。
Saga事務管理。
事務活動補償機制(Courier)
消息審計
消息管道處理機制
Abp 框架下事件消息集成
使用MassTransit重新實現IDistributedEventBus。
在消費端Consumer傳遞用戶身份信息。
使用Asp.Net Core Web Host 作消費端Consumer宿主。
集成MassTransit
在Module初始化時,注入MassTransit實例,并啟動。
Copy/// <summary> /// 配置DistributedEventBus /// </summary> /// <param name="context"></param> /// <param name="configuration"></param> /// <param name="hostingEnvironment"></param> private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment) {var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();var mqConnectionString = "rabbitmq://" + options.ConnectionString;context.Services.AddMassTransit(mtConfig =>{//inject consumers into IOC from assemblymtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));mtConfig.AddBus(provider =>{var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>{var host = mqConfig.Host(new Uri(mqConnectionString), h =>{h.Username(options.UserName);h.Password(options.Password);});// set special message serializermqConfig.UseBsonSerializer();// integrated existed logger compontentmqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>{//set rabbitmq prefetch countq.PrefetchCount = 200;//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);});mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>{//set rabbitmq prefetch countq.PrefetchCount = 50;//q.UseRateLimit(100, TimeSpan.FromSeconds(1));//q.UseConcurrencyLimit(2);//set message retry policyq.UseMessageRetry(r => r.Interval(3, 100));q.Consumer<UserSyncEventConsumer>(provider);EndpointConvention.Map<UserSyncEvent>(q.InputAddress);});mqConfig.ConfigureEndpoints(provider);mqConfig.UseAuditingFilter(provider, o =>{o.ReplaceAuditing = true;});});// set authtication middleware for user identitybus.ConnectAuthenticationObservers(provider);return bus;});}); }在MassTransit中,使用IBusControl接口?StartAsync?或?StopAsync?來啟動或停止。
使用IPublishEndpoint重新實現IDistributedEventBus接口,實現與abp分布式事件總線集成。
Copy public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency{private readonly IPublishEndpoint _publishEndpoint;//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }public MassTransitDistributedEventBus(IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,IPublishEndpoint publishEndpoint){//ServiceScopeFactory = serviceScopeFactory;_publishEndpoint = publishEndpoint;DistributedEventBusOptions = distributedEventBusOptions.Value;//Subscribe(distributedEventBusOptions.Value.Handlers);}/** Not Implementation*/public Task PublishAsync<TEvent>(TEvent eventData)where TEvent : class{return _publishEndpoint.Publish(eventData);}public Task PublishAsync(Type eventType, object eventData){return _publishEndpoint.Publish(eventData, eventType);}}到此,我們實現了MassTransit與Abp集成。
事件消息傳遞User Claims
在實際業務實現過程中,我們會用消息隊列實現“削峰填谷”的效果。異步消息隊列中傳遞用戶身份信息如何實現呢?
我們先看看abp在WebApi中,如何確定當前用戶?
ICurrentUser?提供當前User Claims抽象。而ICurrentUser依賴于ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 來記錄當前用戶身份。
在MassTransit中,利用IPublishObserver?>?IConsumeObserver?生產者/消費端的觀察者,來實現傳遞已認證的用戶Claims。
Copy /// <summary>/// 生產者傳遞當前用戶Principal/// </summary>public class AuthPublishObserver : IPublishObserver{private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;public AuthPublishObserver(ICurrentPrincipalAccessor currentPrincipalAccessor,IClaimsPrincipalFactory claimsPrincipalFactory){_currentPrincipalAccessor = currentPrincipalAccessor;_claimsPrincipalFactory = claimsPrincipalFactory;}public Task PrePublish<T>(PublishContext<T> context) where T : class{var claimsPrincipal = _claimsPrincipalFactory.CreateClaimsPrincipal(_currentPrincipalAccessor.Principal);if (claimsPrincipal != null){context.Headers.SetAuthenticationHeaders(claimsPrincipal);}return TaskUtil.Completed;}public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;} Copy/// <summary>/// 消費端從MqMessage Heads 中獲取當前用戶Principal,并賦值給HttpContext/// </summary>public class AuthConsumeObserver : IConsumeObserver{private readonly IHttpContextAccessor _httpContextAccessor;private readonly IServiceScopeFactory _factory;public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory){_httpContextAccessor = httpContextAccessor;_factory = factory;}public Task PreConsume<T>(ConsumeContext<T> context) where T : class{if (_httpContextAccessor.HttpContext == null){_httpContextAccessor.HttpContext = new DefaultHttpContext{RequestServices = _factory.CreateScope().ServiceProvider};}var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated){var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();_httpContextAccessor.HttpContext.User = claimsPrincipal;Thread.CurrentPrincipal = claimsPrincipal;}return TaskUtil.Completed;}public Task PostConsume<T>(ConsumeContext<T> context) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class{_httpContextAccessor.HttpContext = null;return TaskUtil.Completed;}}使用Asp.Net Core Web Host 作消費端Consumer宿主
基于以下幾點原因,我們使用Asp.Net Core Web Host 作為消息端Consumer宿主
部署在Linux環境下,Asp.Net Core Web Host 通常使用守護進程來啟動服務實例,這樣可以保證服務不被中斷。
根據abp vnext DDD 項目分層,最大程度利用Application層應用方法,復用abp vnext 框架機制。
MassTransit 深入研究
延遲消息
限流、熔斷降級
批量消費
Saga
References
abp vnext disctributed event bus
MassTransit
總結
以上是生活随笔為你收集整理的基于Abp VNext框架设计 - Masstransit分布式消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Core开发实战(第11课:文
- 下一篇: 智能对话引擎:两天快速打造疫情问答机器人