日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > asp.net >内容正文

asp.net

Kafka学习征途:.NET Core操作Kafka

發(fā)布時(shí)間:2024/3/12 asp.net 61 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka学习征途:.NET Core操作Kafka 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

【Kafka】|?總結(jié)/Edison Zhou

1可用的Kafka .NET客戶端

作為一個(gè).NET Developer,自然想要在.NET項(xiàng)目中集成Kafka實(shí)現(xiàn)發(fā)布訂閱功能。那么,目前可用的Kafka客戶端有哪些呢?

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客戶端還有rdkafka-dotnet項(xiàng)目,但是其已經(jīng)被并入confluent-kakfa-dotnet項(xiàng)目進(jìn)行維護(hù)了。

因此,推薦使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星項(xiàng)目CAP的Kafka擴(kuò)展包(DotNetCore.CAP.Kafka)內(nèi)部也是基于Confluent.Kafka來(lái)實(shí)現(xiàn)的:

接下來(lái),本文就來(lái)在.NET Core項(xiàng)目下通過(guò)Confluent.Kafka和CAP兩個(gè)主流開(kāi)源項(xiàng)目來(lái)操作一下Kafka,實(shí)現(xiàn)一下發(fā)布訂閱的功能。

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一個(gè)啟動(dòng)好的Kafka Broker服務(wù)。關(guān)于如何搭建Kafka,請(qǐng)參考上一篇:通過(guò)Docker部署Kafka集群。

安裝相關(guān)組件

在.NET Core項(xiàng)目中新建一個(gè)類(lèi)庫(kù),暫且命名為EDT.Kafka.Core,安裝Confluent.Kafka組件:

PM>Install-Package Confluent.Kafka

編寫(xiě)KafkaService

編寫(xiě)IKafkaService接口:

namespace EDT.Kafka.Core {public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;} }

編寫(xiě)KafkaService實(shí)現(xiàn)類(lèi):

namespace EDT.Kafka.Core {public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小為16KLingerMs = 20 // 修改等待時(shí)間為20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假設(shè)只需要Leader響應(yīng)即可AutoOffsetReset = AutoOffsetReset.Earliest // 從最早的開(kāi)始消費(fèi)起};using?(var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經(jīng)到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}} }

為了方便后續(xù)的演示,在此項(xiàng)目中再創(chuàng)建一個(gè)類(lèi) EventData:

public class EventData {public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; } }

編寫(xiě)Producer

新建一個(gè)Console項(xiàng)目,暫且命名為:EDT.Kafka.Demo.Producer,其主體內(nèi)容如下:

namespace EDT.Kafka.Demo.Producer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}} }

編寫(xiě)Consumer

新建一個(gè)Console項(xiàng)目,暫且命名為:EDT.Kafka.Demo.Consumer,其主體內(nèi)容如下:

namespace EDT.Kafka.Demo.Consumer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理");});}} }

測(cè)試Pub/Sub效果

將Producer和Consumer兩個(gè)項(xiàng)目都啟動(dòng)起來(lái),可以看到當(dāng)Consumer消費(fèi)完50條消息并一一確認(rèn)之后,Producer這邊就算發(fā)布結(jié)束。

3基于CAP項(xiàng)目的Sample

模擬場(chǎng)景說(shuō)明

假設(shè)我們有兩個(gè)微服務(wù),一個(gè)是Catalog微服務(wù),一個(gè)是Basket微服務(wù),當(dāng)Catalog微服務(wù)產(chǎn)生了Product價(jià)格更新的事件,就會(huì)將其發(fā)布到Kafka,Basket微服務(wù)作為消費(fèi)者就會(huì)訂閱這個(gè)消息然后更新購(gòu)物車(chē)中對(duì)應(yīng)商品的最新價(jià)格。

Catalog API

新建一個(gè)ASP.NET Core WebAPI項(xiàng)目,然后分別安裝以下組件:

PM>Install Package DotNetCore.CAP PM>Install Package DotNetCore.CAP.MongoDB PM>Install Package DotNetCore.CAP.Kafka

在Startup中的ConfigureServices方法中注入CAP:

public void ConfigureServices(IServiceCollection services) {......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");}); }

新建一個(gè)ProductController,實(shí)現(xiàn)一個(gè)Update產(chǎn)品價(jià)格的接口,在其中通過(guò)CapPublisher完成發(fā)布消息到Kafka:

namespace EDT.Demo.Catalog.API.Controllers {[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "電動(dòng)牙刷A", Price = 99.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0002", Name = "電動(dòng)牙刷B", Price = 199.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0003", Name = "洗衣機(jī)A", Price = 2999.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0004", Name = "洗衣機(jī)B", Price = 3999.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0005", Name = "電視機(jī)A", Price = 1899.90M, Introduction = "暫無(wú)介紹" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 業(yè)務(wù)代碼var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 發(fā)布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}} }

Basket API

參照Catalog API項(xiàng)目創(chuàng)建ASP.NET Core WebAPI項(xiàng)目,并安裝對(duì)應(yīng)組件,在ConfigureServices方法中注入CAP。

新建一個(gè)BasketController,用于訂閱Kafka對(duì)應(yīng)Topic:ProductPriceChanged 的消息。

namespace EDT.Demo.Basket.API.Controllers {[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "電動(dòng)牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "電視機(jī)A", Price = 1899.90M }, Count = 1 },} },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "電動(dòng)牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣機(jī)B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}} }

測(cè)試效果

同時(shí)啟動(dòng)Catalog API 和 Basket API兩個(gè)項(xiàng)目。

首先,通過(guò)Swagger在Basket API中查看所有用戶購(gòu)物車(chē)中的商品的價(jià)格,可以看到,0002的商品是199.9元。

然后,通過(guò)Swagger在Catalog API中更新Id為0002的商品的價(jià)格至499.9元。

最后,通過(guò)Swagger在Basket API中查看所有用戶購(gòu)物車(chē)中的商品的價(jià)格,可以看到,0002的商品已更新至499.9元。

End總結(jié)

本文總結(jié)了.NET Core如何通過(guò)對(duì)應(yīng)客戶端操作Kafka,基于Confluent.Kafka項(xiàng)目和CAP項(xiàng)目可以方便的實(shí)現(xiàn)發(fā)布訂閱的效果。

參考資料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麥比烏斯皇,《.NET使用Kafka小結(jié)》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件總線解決方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

極客時(shí)間,胡夕《Kafka核心技術(shù)與實(shí)戰(zhàn)》

B站,尚硅谷《Kafka 3.x入門(mén)到精通教程》

年終總結(jié):Edison的2020年終總結(jié)

數(shù)字化轉(zhuǎn)型:我在傳統(tǒng)企業(yè)做數(shù)字化轉(zhuǎn)型

C#刷題:C#刷劍指Offer算法題系列文章目錄

.NET面試:.NET開(kāi)發(fā)面試知識(shí)體系

.NET大會(huì):2020年中國(guó).NET開(kāi)發(fā)者大會(huì)PDF資料

總結(jié)

以上是生活随笔為你收集整理的Kafka学习征途:.NET Core操作Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。