在Asp.Net Core中集成Kafka
在我們的業(yè)務(wù)中,我們通常需要在自己的業(yè)務(wù)子系統(tǒng)之間相互發(fā)送消息,一端去發(fā)送消息另一端去消費當(dāng)前消息,這就涉及到使用消息隊列MQ的一些內(nèi)容,消息隊列成熟的框架有多種,這里你可以讀這篇文章來了解這些MQ的不同,這篇文章的主要目的是用來系統(tǒng)講述如何在Asp.Net Core中使用Kafka,整篇文章將介紹如何寫消息發(fā)送方代碼、消費方代碼、配套的工具的使用,希望讀完這篇文章之后對整個消息的運行機制有一定的理解,在這里通過一張圖來簡要了解一下消息隊列中的一些概念。
圖一 Kafka消息隊列
一 安裝NUGET包
在寫代碼之前首先要做的就是安裝nuget包了,我們這里使用的是Confluent.Kafka 1.0.0-RC4版本,具體項目要根據(jù)具體的時間來確定引用包的版本,這些包可能更新比較快。
圖二 引用Kafka包依賴
二?消息發(fā)送方(Producer)
1 在項目中添加所有觸發(fā)事件的接口 IIntegrationEvent,后面所有的觸發(fā)事件都是繼承自這個接口。
| /// <summary>/// 集成事件的接口定義/// </summary>public?interface?IIntegrationEvent {string?Key {?get;?set; }} |
2 定義Kafka生產(chǎn)者
| /// <summary>/// Kafka 生產(chǎn)者的 Domain Service/// </summary>public?class?KafkaProducer : DomainService {private?readonly?IConfiguration _config;private?readonly?ILogger<KafkaProducer> _logger;public?KafkaProducer(IConfiguration config,ILogger<KafkaProducer> logger) {_config = config;_logger = logger;}/// <summary>/// 發(fā)送事件/// </summary>/// <param name="event"></param>public?void?Produce(IIntegrationEvent @event) {var?topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}");var?producerConfig =?new?ProducerConfig {BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")};var?builder =?new?ProducerBuilder<string,?string>(producerConfig);using?(var?producer = builder.Build()) {try?{var?json = JsonConvert.SerializeObject(@event);var?dr = producer.ProduceAsync(topic,?new?Message<string,?string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();_logger.LogDebug("發(fā)送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}?catch?(ProduceException<string,?string> ex) {_logger.LogError(ex,?"發(fā)送事件到 {0} 失敗,原因 {1} ", topic, ex.Error.Reason);}}}} |
在這里我們的Producer根據(jù)業(yè)務(wù)的需要定義在領(lǐng)域服務(wù)中,這里面最關(guān)鍵的就是Produce方法了,該方法的參數(shù)是繼承自IIntegrationEvent 接口的各種各樣事件,在這個方法中,我們獲取配置在appsetting.json中配置的各種Topic以及Kafka服務(wù)器的地址,具體的配置如下方截圖所示。
圖三 配置服務(wù)器地址以及各種Topic
通過當(dāng)前配置我們就知道我們的消息要發(fā)往何處,然后我們就可以創(chuàng)建一個producer來將我們的事件(實際上是定義的數(shù)據(jù)結(jié)構(gòu))序列化成Json,然后通過異步的方式發(fā)送出去,這里需要注意我們創(chuàng)建的Producer要放在一個using塊中,這樣在創(chuàng)建完成并發(fā)送消息之后就會釋放當(dāng)前生產(chǎn)者。這里如果發(fā)送失敗會在當(dāng)前日志中記錄發(fā)送的值以及錯誤的原因從而便于進行調(diào)試。這里舉出其中的一個事件RepairContractFinishedEvent為例來說明。
| /// <summary>/// 維修合同完成的事件/// </summary>public?class?RepairContractFinishedEvent : IIntegrationEvent {public?RepairContract RepairContract {?get;?set; }//一個維修合同會對應(yīng)多個調(diào)整單public?List<RepairContractAdjust> RepairContractAdjusts {?get;?set; }public?string?Key {?get;?set; }} |
這個里面RepairContract以及List集合都是我們定義的一種數(shù)據(jù)結(jié)構(gòu)。
最后我們來看看在具體的領(lǐng)域?qū)又形覀冊撊绾斡|發(fā)此事件的,這里我們也定義了一個叫做IRepairContractEventManager接口的領(lǐng)域服務(wù),并在里面定義了一個叫做Finished的接口,然后在RepairContractEventManager中實現(xiàn)該方法。
| public?class?RepairContractEventManager : DomainService, IRepairContractEventManager {private?readonly?KafkaProducer _producer;private?readonly?IRepository<RepairContract, Guid> _repairContractRepository;private?readonly?IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository;public?RepairContractEventManager(KafkaProducer producer,IRepository<RepairContract, Guid> repairContractRepository,IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {_producer = producer;_repairContractRepository = repairContractRepository;_repairContractAdjustRepository = repairContractAdjustRepository;}public?void?Finished(Guid repairContractId) {var?repairContract = _repairContractRepository.GetAll().Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials).SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();var?repairContractAdjusts = _repairContractAdjustRepository.GetAll().Include(a => a.WorkItems).ThenInclude(w => w.Materials).Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();var?@event?=?new?RepairContractFinishedEvent {Key = repairContract?.Code,RepairContract = repairContract,RepairContractAdjusts = repairContractAdjusts};_producer.Produce(@event);}} |
這段代碼就是組裝RepairContractFinishedEvent的具體實現(xiàn)過程,然后調(diào)用我們之前創(chuàng)建的KafkaProducer對象然后將消息發(fā)送出去,這樣在需要觸發(fā)當(dāng)前RepairContractFinishedEvent 的地方來注入IRepairContractEventManager接口,然后調(diào)對應(yīng)的Finished方法,這樣就完成了整個消息的發(fā)送的過程了。
三 查看消息的發(fā)送
在發(fā)送完消息后我們可以到Kafka 集群 Control Center中查找我們發(fā)送的所有消息。選擇其中的一條消息,雙擊,然后選擇INSPECT來查看發(fā)送的消息
圖四 Kafka Control Center中查看發(fā)送消息
四 消息的接收方(Consumer)
在正確創(chuàng)建消息的發(fā)送方后緊接著就是定義消息的接收方了,消息的接收方顧名思義就是消費剛才消息的一方,這里的步驟和發(fā)送類似,但是也有很大的不同,消息的消費方核心是一個后臺服務(wù),并且在單獨的線程中監(jiān)聽來自發(fā)送方的消息,并進行消費,這里我們先定義一個叫做KafkaConsumerHostedService的基類,我們具體來看看代碼。
| /// <summary>/// Kafka 消費者的后臺服務(wù)基礎(chǔ)類/// </summary>/// <typeparam name="T">事件類型</typeparam>public?abstract?class?KafkaConsumerHostedService<T> : BackgroundService?where?T : IIntegrationEvent {protected?readonly?IServiceProvider _services;protected?readonly?IConfiguration _config;protected?readonly?ILogger<KafkaConsumerHostedService<T>> _logger;public?KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {_services = services;_config = config;_logger = logger;}/// <summary>/// 消費該事件,比如調(diào)用 Application Service 持久化數(shù)據(jù)等/// </summary>/// <param name="event">事件內(nèi)容</param>protected?abstract?void?DoWork(T @event);/// <summary>/// 構(gòu)造 Kafka 消費者實例,監(jiān)聽指定 Topic,獲得最新的事件/// </summary>/// <param name="stoppingToken">終止標(biāo)識</param>/// <returns></returns>protected?override?async Task ExecuteAsync(CancellationToken stoppingToken) {await Task.Factory.StartNew(() => {var?topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");var?consumerConfig =?new?ConsumerConfig {BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),AutoOffsetReset = AutoOffsetReset.Earliest,GroupId = _config.GetValue<string>("Application:Name"),EnableAutoCommit =?true,};var?builder =?new?ConsumerBuilder<string,?string>(consumerConfig);using?(var?consumer = builder.Build()) {consumer.Subscribe(topic);while?(!stoppingToken.IsCancellationRequested) {try?{var?result = consumer.Consume(stoppingToken);var?@event?= JsonConvert.DeserializeObject<T>(result.Value);DoWork(@event);//consumer.StoreOffset(result);}?catch?(OperationCanceledException ex) {consumer.Close();_logger.LogDebug(ex,?"Kafka 消費者結(jié)束,退出后臺線程");}?catch?(AbpValidationException ex) {_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");}?catch?(ConsumeException ex) {_logger.LogError(ex,?"Kafka 消費者產(chǎn)生異常");}?catch?(KafkaException ex) {_logger.LogError(ex,?"Kafka 產(chǎn)生異常");}?catch?(ValidationException ex) {_logger.LogError(ex,?"Kafka 消息驗證失敗");}?catch?(Exception ex) {_logger.LogError(ex,?"Kafka 捕獲意外異常");}}}}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);}private?string?GetValidationErrorNarrative(AbpValidationException validationException) {var?detailBuilder =?new?StringBuilder();detailBuilder.AppendLine("驗證過程中檢測到以下錯誤");foreach?(var?validationResult?in?validationException.ValidationErrors) {detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);detailBuilder.AppendLine();}return?detailBuilder.ToString();}} |
這段代碼中我們會創(chuàng)建一個consumer,這里我們會在一個While循環(huán)中去訂閱特定Topic消息,這里的BootstrapServers是和發(fā)送方保持一致,并且也是在當(dāng)前應(yīng)用程序中的appsetting.json中進行配置的,而且這里的consumer.Consume方法是一個阻塞式方法,當(dāng)發(fā)送方發(fā)送特定事件后,這里會接收到同樣名稱的Topic的消息,然后將接收到的Json數(shù)據(jù)進行反序列化,然后交由后面的DoWork方法進行處理。這里還是以之前生成者發(fā)送的RepairContractFinished事件為例,這里也需要定義一個RepairContractFinishedEventHandler來處理生產(chǎn)者發(fā)送的消息。
| public?class?RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {public?RepairContractFinishedEventHandler(IServiceProvider services,IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger):?base(services, config, logger) {}/// <summary>/// 調(diào)用 Application Service,新增或更新維修合同及關(guān)聯(lián)實體/// </summary>/// <param name="event">待消費的事件</param>protected?override?void?DoWork(RepairContractFinishedEvent @event) {using?(var?scope = _services.CreateScope()) {var?service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);}}} |
這里需要特別注意的是在這里我么也需要定義一個繼承自IIntegrationEvent接口的事件,這里也是定義一種數(shù)據(jù)結(jié)構(gòu),并且這里的數(shù)據(jù)結(jié)構(gòu)和生成者定義的要保持一致,否則消費方在反序列化的時候會丟失不能夠匹配的信息。
| public?class?RepairContractFinishedEvent : IIntegrationEvent {public?RepairContractDto RepairContract {?get;?set; }public?List<RepairContractAdjustDto> RepairContractAdjusts {?get;?set; }public?string?Key {?get;?set; }} |
另外在DoWork方法中我們也需要注意代碼也需要用using包裹,從而在消費方消費完后釋放掉當(dāng)前的應(yīng)用服務(wù)。最后需要注意的就是我們的每一個Handle都是一個后臺服務(wù),我們需要在Asp.Net Core的Startup的ConfigureServices進行配置,從而將當(dāng)前的后臺服務(wù)添加到Asp.Net Core依賴注入容器中。
| /// <summary>/// 注冊集成事件的處理器/// </summary>/// <param name="services"></param>private?void?AddIntegrationEventHandlers(IServiceCollection services) {services.AddHostedService<RepairContractFinishedEventHandler>();services.AddHostedService<ProductTransferDataEventHandler>();services.AddHostedService<PartUpdateEventHandler>();services.AddHostedService<VehicleSoldFinishedEventHandler>();services.AddHostedService<AddOrUpdateDealerEventHandler>();services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();services.AddHostedService<CustomerFinishedEventHandler>();services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();services.AddHostedService<AddCustomerEventHandler>();} |
最后我們也看看我們的appsetting.json的配置文件關(guān)于kafka的配置。
| "Kafka": {"BootstrapServers":?"127.0.0.1:9092","MessageTimeoutMs": 5000,"Topics": {"RepairContractFinishedEvent":?"repair-contract-finished","AddOrUpdateProductCategoryEvent":?"add-update-product-category","AddOrUpdateDealerEvent":?"add-update-dealer","ClaimApproveEvent":?"claim-approve","ProductTransferDataEvent":?"product-update","PartUpdateEvent":?"part-update","VehicleSoldFinishedEvent":?"vehiclesold-finished","CustomerFinishedEvent":?"customer-update","VehicleInformationUpdateStatusEvent":?"add-update-vehicle-info","AddCustomerEvent":?"add-customer"}}, |
這里需要注意的是發(fā)送方和接收方必須保證Topic一致,并且配置的服務(wù)器名稱端口保持一致,這樣才能夠保證消息的準(zhǔn)確發(fā)送和接收。最后對于服務(wù)端,這里推薦一個VSCode的插件kafka,能夠創(chuàng)建并發(fā)送消息,這樣就方便我們來發(fā)送我們需要的數(shù)據(jù)了,這里同樣需要我們先建立一個.kafka的文件,然后配置Kafka服務(wù)的地址和端口號。
圖五 利用VSCode Kafka插件發(fā)送消息
原文地址:https://www.cnblogs.com/seekdream/p/10757541.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結(jié)
以上是生活随笔為你收集整理的在Asp.Net Core中集成Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 请给你的短信验证码接口加上SSL双向验证
- 下一篇: 分享一个.NET平台开源免费跨平台的大数