MassTransit中RequestResponse基本使用
MassTransit 是一個自由、開源、輕量級的消息總線基于.Net框架, 用于創(chuàng)建分布式應用程序。方便搭建基于消息的松耦合異步通信的應用程序和服務。MassTransit 在現(xiàn)有消息傳輸上提供了一組廣泛的功能, 從而使開發(fā)人員能夠友好地使用基于消息的會話模式異步連接服務。基于消息的通信是實現(xiàn)面向服務的體系結構的可靠和可擴展的方式。
官網(wǎng)地址:http://masstransit-project.com/
發(fā)布訂閱模式
這種場景十分常見,發(fā)送一個消息(或事件)到消息隊列中,有一個或是多個訂閱方對預期的消息接收處理。
基于需要搭建了兩個WebApi程序,用于模擬發(fā)送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程序中引用包配置下即可。
<PackageReference Include="MassTransit" Version="7.2.0" /> <PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" /> <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />發(fā)布端配置
在Startup中增加MassTransit需要的服務及初始化配置。
對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
對發(fā)送方需要發(fā)送的消息初始化一個請求客戶端,配置請求信息及推送到MQ的地址。
為了快速了解,使用Controller在Action中發(fā)起對MQ的消息推送
[ApiController] [Route("[controller]")] public class ValueController : ControllerBase {readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();} }訂閱端配置
訂閱端也創(chuàng)建一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和發(fā)布端一樣。
對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer
增加一個接受點,指定隊列名稱,即發(fā)送端發(fā)送的隊列名稱,設置該隊列消費處理的Consumer,即ValueEnteredEventConsumer
如此一來,通過Postman發(fā)送一個請求,經(jīng)發(fā)布端發(fā)布一個消息到RabbitMQ,訂閱端偵聽消息,處理消息,一切都很熟悉。
請求響應模式
在發(fā)布訂閱的基礎上,改變以往的習慣,當發(fā)布一個消息后,等待訂閱方的處理,并將消息推送回RabbitMQ,發(fā)送方接受到處理后的消息繼續(xù)執(zhí)行。
請求端
在Startup中新加上一個用于發(fā)送消息(CheckOrderStatus)的請求客戶端及指定消息隊列名稱(為每一個消息創(chuàng)建一個單獨的隊列)。
x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。
[ApiController] [Route("[controller]")] public class OrderController : ControllerBase {private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;} }響應端
同樣在響應端Startup中對新的消息設置下消息偵聽隊列以及相應的Handler如下的ValueEnteredEventConsumer去消費消息并返回處理結果。
x.AddConsumer<CheckOrderStatusConsumer?>(); x.UsingRabbitMq((context, cfg) => {// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer?>(context);}); });Consumer中獲取請求參數(shù),執(zhí)行請求,返回執(zhí)行結果。
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus> {public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});} }這樣一來,當請求端發(fā)起一個消息(事件)到RabbitMQ,響應端偵聽并處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續(xù)執(zhí)行后續(xù)請求。
HTTP方式差異
與以往的Http請求方式有所不同,通過httpClient.PostAsync發(fā)送請求,接收端處理并返回結果,而走requestClient發(fā)送請求到RabbitMQ,再由RabbitMQ推送到偵聽節(jié)點消費并返回結果,如下第一二部分結構。
總結
以上是生活随笔為你收集整理的MassTransit中RequestResponse基本使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些微服务拆分的浅见
- 下一篇: Visual Studio 2022 预