日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MassTransit中RequestResponse基本使用

發(fā)布時間:2023/12/4 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MassTransit中RequestResponse基本使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

MassTransit 是一個自由、開源、輕量級的消息總線基于.Net框架, 用于創(chuàng)建分布式應用程序。方便搭建基于消息的松耦合異步通信的應用程序和服務。MassTransit 在現(xiàn)有消息傳輸上提供了一組廣泛的功能, 從而使開發(fā)人員能夠友好地使用基于消息的會話模式異步連接服務。基于消息的通信是實現(xiàn)面向服務的體系結構的可靠和可擴展的方式。

官網(wǎng)地址:http://masstransit-project.com/

發(fā)布訂閱模式

這種場景十分常見,發(fā)送一個消息(或事件)到消息隊列中,有一個或是多個訂閱方對預期的消息接收處理。

基于需要搭建了兩個WebApi程序,用于模擬發(fā)送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程序中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" /> <PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" /> <PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

發(fā)布端配置

在Startup中增加MassTransit需要的服務及初始化配置。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。

  • 對發(fā)送方需要發(fā)送的消息初始化一個請求客戶端,配置請求信息及推送到MQ的地址。

services.AddMassTransit(x => {x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});});x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered"))); }); services.AddMassTransitHostedService();

為了快速了解,使用Controller在Action中發(fā)起對MQ的消息推送

[ApiController] [Route("[controller]")] public class ValueController : ControllerBase {readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}[HttpPost]public async Task<ActionResult> Post(string value){await _publishEndpoint.Publish<ValueEntered>(new{Value = value});return Ok();} }

訂閱端配置

訂閱端也創(chuàng)建一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和發(fā)布端一樣。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。

  • 為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer

  • 增加一個接受點,指定隊列名稱,即發(fā)送端發(fā)送的隊列名稱,設置該隊列消費處理的Consumer,即ValueEnteredEventConsumer

services.AddMassTransit(x => {x.AddConsumer<ValueEnteredEventConsumer>();x.UsingRabbitMq((context, cfg) =>{cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{h.Username(Configuration["RabbitmqConfig:Username"]);h.Password(Configuration["RabbitmqConfig:Password"]);});cfg.ReceiveEndpoint("events-valueentered", e =>{e.ConfigureConsumer<ValueEnteredEventConsumer>(context);});}); }); services.AddMassTransitHostedService();

如此一來,通過Postman發(fā)送一個請求,經(jīng)發(fā)布端發(fā)布一個消息到RabbitMQ,訂閱端偵聽消息,處理消息,一切都很熟悉。

請求響應模式

在發(fā)布訂閱的基礎上,改變以往的習慣,當發(fā)布一個消息后,等待訂閱方的處理,并將消息推送回RabbitMQ,發(fā)送方接受到處理后的消息繼續(xù)執(zhí)行。

請求端

在Startup中新加上一個用于發(fā)送消息(CheckOrderStatus)的請求客戶端及指定消息隊列名稱(為每一個消息創(chuàng)建一個單獨的隊列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。

[ApiController] [Route("[controller]")] public class OrderController : ControllerBase {private readonly IRequestClient<CheckOrderStatus> _client;public OrderController(IRequestClient<CheckOrderStatus> client){_client = client;}public async Task<OrderStatusResult> Get(string id){var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });return response.Message;} }

響應端

同樣在響應端Startup中對新的消息設置下消息偵聽隊列以及相應的Handler如下的ValueEnteredEventConsumer去消費消息并返回處理結果。

x.AddConsumer<CheckOrderStatusConsumer?>(); x.UsingRabbitMq((context, cfg) => {// ...cfg.ReceiveEndpoint("events-checkorderstatus", e =>{e.ConfigureConsumer<CheckOrderStatusConsumer?>(context);}); });

Consumer中獲取請求參數(shù),執(zhí)行請求,返回執(zhí)行結果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus> {public async Task Consume(ConsumeContext<CheckOrderStatus> context){if (context.Message.OrderId == "9527"){throw new InvalidOperationException("Order not found");}Console.WriteLine($"OrderId:{context.Message.OrderId}");await context.RespondAsync<OrderStatusResult>(new{OrderId = context.Message.OrderId,Timestamp = Guid.NewGuid().ToString(),StatusCode = "1",StatusText = "Close"});} }

這樣一來,當請求端發(fā)起一個消息(事件)到RabbitMQ,響應端偵聽并處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續(xù)執(zhí)行后續(xù)請求。

HTTP方式差異

與以往的Http請求方式有所不同,通過httpClient.PostAsync發(fā)送請求,接收端處理并返回結果,而走requestClient發(fā)送請求到RabbitMQ,再由RabbitMQ推送到偵聽節(jié)點消費并返回結果,如下第一二部分結構。

總結

以上是生活随笔為你收集整理的MassTransit中RequestResponse基本使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。