MassTransit Get Started-
MassTransit:是一款.NET的分布式應用程序框架(開源、免費)。通過MassTransit,可以輕松創建利用基于消息的、松耦合異步通信的應用程序和服務,以提高可用性,可靠性和可伸縮性。
MassTransit本身定位輕量級的服務總線,并支持多種傳輸方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息異常處理:重試配置、重新交付、erro管道、死信管道。分布式事務處理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、調度支持:Quartz?、hangfire。更多功能參考官網文檔。
MassTransit目前已經發布到了第7個版本了,7.0版本新增了對Kafka 的支持,構建僅支持.NET Standard 2.0...其他改動不大。MassTransit社區使用也是很活躍的,對于首次接觸的,通過本篇文章(基于rabbitmq)幫你快速入門!
一個應用程序或服務可以使用兩種不同的方法來生產消息,主要區別是sent需要指定具體的端點地址,而pub不需要,下面的代碼會演示這兩種方式。
發布事件(多個接收者)
發送命令(一個接收者)
發布事件(事件消息)
場景假設:在xx項目中,需要與第三方進行交互。比如:訂單發貨之后,把發貨的信息的推送給第三方、把訂單的狀態變化也推送過去。我們分析下需求,系統要求在發貨之后,需要做若干事情。可以解讀為,發貨這個動作已經發生了,需要做的事情不確定。這不是典型的發布訂閱模式嘛!好了,那使用masstransit如何實現呢?
1.創建一個類庫項目定義消息體,命名為contract
public interface OrderShipped{public Guid OrderId { get; set; }//訂單號}2.創建一個api項目作為消息的生產方,命名為Delivery,然后安裝nuget包:
Install-Package MassTransit.AspNetCore Install-Package MassTransit.RabbitMQ在Startup類的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");//填寫你的用戶名hostConfig.Password("mq123");//填寫你的用戶名});});});services.AddMassTransitHostedService();在ValueController中,進行發布消息
readonly IPublishEndpoint _publishEndpoint;public ValuesController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}/// <summary>/// 測試發布/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> OrderDelivery(){//其他await _publishEndpoint.Publish<OrderShipped>(new{OrderId = Guid.NewGuid()});return Ok();}通過使用IPublishEndpoint實例的Publish方法,發布一個事件。
3.創建一個api項目作為消息的消費方,命名為Listener,然后安裝nuget包:
Install-Package MassTransit.AspNetCore Install-Package MassTransit.RabbitMQ在Startup類的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.AddConsumer<DeliveryExpressNotifyConsumer>();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");hostConfig.Password("mq123");});config.ReceiveEndpoint("Order.Shipped", e =>{e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);});});});services.AddMassTransitHostedService();實現接口IConsumer即可完成消費邏輯的過程。
public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>{ILogger<DeliveryExpressNotifyConsumer> _logger;public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<OrderShipped> context){_logger.LogInformation("訂單id: {Value}已發貨,通知第三方", context.Message.OrderId);}} 到此,消息生產方和消費方代碼都已經實現了,運行一下,效果如下發送消息(命令消息)
發送消息適用的場景,常常是一種命令,并且期望消息只被一個接收者或服務實例進行處理。masstransit使用發送消息和發布消息,在消息生產方不同之處,sent消息需要指定目標地址,使用ISendEndpoint的Send方法,消費者代碼一樣的配置。以下代碼演示發送一個創建發貨單的指令消息,比較簡單直接貼出源碼:
1.定義一個消息SubmitShippingOrder
public class SubmitShippingOrder{public Guid OrderId { get; set; }}2.sent消息
readonly IPublishEndpoint _publishEndpoint;readonly ISendEndpointProvider _sendEndpointProvider;public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider){_publishEndpoint = publishEndpoint;_sendEndpointProvider = sendEndpointProvider;}/// <summary>/// 測試發送/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> Delivery(){var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //獲取發送端點await endpoint.Send(new SubmitShippingOrder{OrderId = Guid.NewGuid()});return Ok();}3.消費
public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>{ILogger<SubmitShippingOrderConsumer> _logger;public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<SubmitShippingOrder> context){_logger.LogInformation("創建發貨單,訂單id: {Value}", context.Message.OrderId);}}運行一下效果:
官網文檔:
http://masstransit-project.com/
https://masstransit-project.com/advanced/courier/
總結
以上是生活随笔為你收集整理的MassTransit Get Started-的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在腾讯云云函数计算上部署.NET Cor
- 下一篇: 消除代码中的坏味道,编写高质量代码