日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

MassTransit中RequestResponse基本使用

發布時間:2023/12/4 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MassTransit中RequestResponse基本使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

MassTransit 是一個自由、開源、輕量級的消息總線基于.Net框架, 用于創建分布式應用程序。方便搭建基于消息的松耦合異步通信的應用程序和服務。MassTransit 在現有消息傳輸上提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基于消息的會話模式異步連接服務。基于消息的通信是實現面向服務的體系結構的可靠和可擴展的方式。

官網地址:http://masstransit-project.com/

發布訂閱模式

這種場景十分常見,發送一個消息(或事件)到消息隊列中,有一個或是多個訂閱方對預期的消息接收處理。

基于需要搭建了兩個WebApi程序,用于模擬發送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程序中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" /> <PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" /> <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

發布端配置

在Startup中增加MassTransit需要的服務及初始化配置。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。

  • 對發送方需要發送的消息初始化一個請求客戶端,配置請求信息及推送到MQ的地址。

services.AddMassTransit(x => {x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});});x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered"))); }); services.AddMassTransitHostedService();

為了快速了解,使用Controller在Action中發起對MQ的消息推送

[ApiController] [Route("[controller]")] public class ValueController : ControllerBase {readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();} }

訂閱端配置

訂閱端也創建一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和發布端一樣。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。

  • 為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer

  • 增加一個接受點,指定隊列名稱,即發送端發送的隊列名稱,設置該隊列消費處理的Consumer,即ValueEnteredEventConsumer

services.AddMassTransit(x => {x.AddConsumer<ValueEnteredEventConsumer>();x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});cfg.ReceiveEndpoint("events-valueentered", e =>{e.ConfigureConsumer<ValueEnteredEventConsumer>(context);});}); }); services.AddMassTransitHostedService();

如此一來,通過Postman發送一個請求,經發布端發布一個消息到RabbitMQ,訂閱端偵聽消息,處理消息,一切都很熟悉。

請求響應模式

在發布訂閱的基礎上,改變以往的習慣,當發布一個消息后,等待訂閱方的處理,并將消息推送回RabbitMQ,發送方接受到處理后的消息繼續執行。

請求端

在Startup中新加上一個用于發送消息(CheckOrderStatus)的請求客戶端及指定消息隊列名稱(為每一個消息創建一個單獨的隊列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。

[ApiController] [Route("[controller]")] public class OrderController : ControllerBase {private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;} }

響應端

同樣在響應端Startup中對新的消息設置下消息偵聽隊列以及相應的Handler如下的ValueEnteredEventConsumer去消費消息并返回處理結果。

x.AddConsumer<CheckOrderStatusConsumer?>(); x.UsingRabbitMq((context, cfg) => {// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer?>(context);}); });

Consumer中獲取請求參數,執行請求,返回執行結果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus> {public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});} }

這樣一來,當請求端發起一個消息(事件)到RabbitMQ,響應端偵聽并處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續執行后續請求。

HTTP方式差異

與以往的Http請求方式有所不同,通過httpClient.PostAsync發送請求,接收端處理并返回結果,而走requestClient發送請求到RabbitMQ,再由RabbitMQ推送到偵聽節點消費并返回結果,如下第一二部分結構。

總結

以上是生活随笔為你收集整理的MassTransit中RequestResponse基本使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。