如何在ASP.NET Core中使用Azure Service Bus Queue
原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod[1]?
譯文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代碼:https://github.com/lamondlu/AzureServiceBusMessaging
本文展示了如何使用Azure Service Bus Queue, 實現2個ASP.NET Core Api應用之間的消息傳輸。
配置Azure Service Bus Queue
你可以從官網文檔中了解到如何配置一個Azure Service Bus Queue.
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal
這里我們使用Queue或者Topic來實現消息傳輸。Queue是一種消息傳輸類型,一旦一個消息被一個消費者接收了,該消息就會從Queue中被移除。
與Queue不同,Topic提供的是一對多的通訊方式。
架構圖
整個應用的實現如下:
?Api 1負責發送消息?Api 2負責監聽Azure Service Bus,并處理接收到的消息
實現一個Service Bus Queue
這里我們首先需要引入Microsoft.Azure.ServiceBus[2] ?程序集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客戶端庫。針對Service Bus的連接字符串我們保存在項目的User Secret中。當部署項目的時候,我們可以使用Azure Key Valut來設置這個Secret值。
在Visual Studio中,右鍵點擊API1, API2項目屬性,選擇Manage User Secrets就可以管理當前項目使用的所有私密信息。
為了發送向Azure Service Bus Queue發送消息,我們需要創建一個SendMessage方法,并接收一個消息參數。這里我們創建了一個我們自己的消息內容類型MyPayload, 將當前該MyPayload對象序列化成Json字符串, 添加到一個Message對象中。
namespace ServiceBusMessaging{ public class ServiceBusSender { private readonly QueueClient _queueClient; private readonly IConfiguration _configuration; private const string QUEUE_NAME = "simplequeue";
public ServiceBusSender(IConfiguration configuration) { _configuration = configuration; _queueClient = new QueueClient( _configuration .GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); }
public async Task SendMessage(MyPayload payload) { string data = JsonConvert.SerializeObject(payload); Message message = new Message(Encoding.UTF8.GetBytes(data));
await _queueClient.SendAsync(message); } }}
在API 1和API 2中,我們需要將ServiceBusSender注冊到應用程序的IOC容器中。這里為了測試方便,我們同時注冊Swagger服務。
services.AddScoped<ServiceBusSender>();
services.AddSwaggerGen(c => { c.SwaggerDoc("v1", new Info { Version = "v1", Title = "Payload View API", }); });}
接下來,我們就可以在控制器中通過構造函數注入的方式使用這個服務了。
在API1中,我們創建一個POST方法,這個方法會將API接收到Payload對象發送到Azure Service Bus Queue中。
data.Add(request);
// Send this to the bus for the other services await _serviceBusSender.SendMessage(new MyPayload { Goals = request.Goals, Name = request.Name, Delete = false });
return Ok(request);}
從Queue中獲取消息
為了監聽Azure Service Bus Queue, 并處理接收到的消息,我們創建了一個新類ServiceBusConsumer,ServiceBusConsumer實現了IServiceBusConsumer接口。
Queue的連接字符串是使用IConfiguration讀取的。RegisterOnMessageHandlerAndReceiveMessages方法負責注冊消息處理程序ProcessMessagesAsync處理消息。ProcessMessagesAsync方法會將得到的消息轉換成對象,并調用IProcessData接口完成最終的消息處理。
namespace ServiceBusMessaging{ public interface IServiceBusConsumer { void RegisterOnMessageHandlerAndReceiveMessages(); Task CloseQueueAsync(); }
public class ServiceBusConsumer : IServiceBusConsumer { private readonly IProcessData _processData; private readonly IConfiguration _configuration; private readonly QueueClient _queueClient; private const string QUEUE_NAME = "simplequeue"; private readonly ILogger _logger;
public ServiceBusConsumer(IProcessData processData, IConfiguration configuration, ILogger<ServiceBusConsumer> logger) { _processData = processData; _configuration = configuration; _logger = logger; _queueClient = new QueueClient( _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME); }
public void RegisterOnMessageHandlerAndReceiveMessages() { var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 1, AutoComplete = false };
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions); }
private async Task ProcessMessagesAsync(Message message, CancellationToken token) { var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body)); _processData.Process(myPayload); await _queueClient.CompleteAsync(message.SystemProperties.LockToken); }
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception"); var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
_logger.LogDebug($"- Endpoint: {context.Endpoint}"); _logger.LogDebug($"- Entity Path: {context.EntityPath}"); _logger.LogDebug($"- Executing Action: {context.Action}");
return Task.CompletedTask; }
public async Task CloseQueueAsync() { await _queueClient.CloseAsync(); } }}
其中IProcessData接口存在于類庫項目ServiceBusMessaging中,它是用來處理消息的。
在Api 2中,我們創建一個ProcessData類,它實現了IProcessData接口。
這里為了簡單測試,我們創建了一個靜態類DataServiceSimi,其中存放了API2中所有保存Payload對象。同時,我們還創建了一個新的控制器ViewPayloadMessagesController,在其中添加了一個GET Action,并返回了靜態類DataServiceSimi中的所有數據。
最后我們還需要將ProcessData注冊到API2的IOC容器中。
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>(); services.AddTransient<IProcessData, ProcessData>();}
最終效果
現在我們分別啟用2個Api項目,并在Api 1的Swagger文檔界面,調用POST請求,添加一個Payload
操作完成之后,我們訪問Api 2的/api/ViewPayloadMessages, 獲得結果如下,Api 1發出的消息出現在了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中獲取了消息,并保存在了自己的靜態類DataServiceSimi中。
References
[1]?damienbod:?https://damienbod.com/author/damienbod/
[2]?Microsoft.Azure.ServiceBus:?https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
[3]?Microsoft.Azure.ServiceBus:?https://www.nuget.org/packages/Microsoft.Azure.ServiceBus
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結
以上是生活随笔為你收集整理的如何在ASP.NET Core中使用Azure Service Bus Queue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 包治百病 | 如何将一个.NET Cor
- 下一篇: eShopOnContainers 是一