Kafka学习征途:.NET Core操作Kafka
【Kafka】|?總結(jié)/Edison Zhou
1可用的Kafka .NET客戶端
作為一個(gè).NET Developer,自然想要在.NET項(xiàng)目中集成Kafka實(shí)現(xiàn)發(fā)布訂閱功能。那么,目前可用的Kafka客戶端有哪些呢?
目前.NET圈子主流使用的是 Confluent.Kafka
confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet
其他主流的客戶端還有rdkafka-dotnet項(xiàng)目,但是其已經(jīng)被并入confluent-kakfa-dotnet項(xiàng)目進(jìn)行維護(hù)了。
因此,推薦使用confluent-kafka-dotnet,其配置友好,功能也更全面。
NCC千星項(xiàng)目CAP的Kafka擴(kuò)展包(DotNetCore.CAP.Kafka)內(nèi)部也是基于Confluent.Kafka來(lái)實(shí)現(xiàn)的:
接下來(lái),本文就來(lái)在.NET Core項(xiàng)目下通過(guò)Confluent.Kafka和CAP兩個(gè)主流開(kāi)源項(xiàng)目來(lái)操作一下Kafka,實(shí)現(xiàn)一下發(fā)布訂閱的功能。
2基于Confluent.Kafka的Sample
要完成本文示例,首先得有一個(gè)啟動(dòng)好的Kafka Broker服務(wù)。關(guān)于如何搭建Kafka,請(qǐng)參考上一篇:通過(guò)Docker部署Kafka集群。
安裝相關(guān)組件
在.NET Core項(xiàng)目中新建一個(gè)類(lèi)庫(kù),暫且命名為EDT.Kafka.Core,安裝Confluent.Kafka組件:
PM>Install-Package Confluent.Kafka編寫(xiě)KafkaService
編寫(xiě)IKafkaService接口:
namespace EDT.Kafka.Core {public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;} }編寫(xiě)KafkaService實(shí)現(xiàn)類(lèi):
namespace EDT.Kafka.Core {public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小為16KLingerMs = 20 // 修改等待時(shí)間為20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假設(shè)只需要Leader響應(yīng)即可AutoOffsetReset = AutoOffsetReset.Earliest // 從最早的開(kāi)始消費(fèi)起};using?(var?consumer?=?new?ConsumerBuilder<Ignore,?string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經(jīng)到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}} }為了方便后續(xù)的演示,在此項(xiàng)目中再創(chuàng)建一個(gè)類(lèi) EventData:
public class EventData {public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; } }編寫(xiě)Producer
新建一個(gè)Console項(xiàng)目,暫且命名為:EDT.Kafka.Demo.Producer,其主體內(nèi)容如下:
namespace EDT.Kafka.Demo.Producer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}} }編寫(xiě)Consumer
新建一個(gè)Console項(xiàng)目,暫且命名為:EDT.Kafka.Demo.Consumer,其主體內(nèi)容如下:
namespace EDT.Kafka.Demo.Consumer {public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理");});}} }測(cè)試Pub/Sub效果
將Producer和Consumer兩個(gè)項(xiàng)目都啟動(dòng)起來(lái),可以看到當(dāng)Consumer消費(fèi)完50條消息并一一確認(rèn)之后,Producer這邊就算發(fā)布結(jié)束。
3基于CAP項(xiàng)目的Sample
模擬場(chǎng)景說(shuō)明
假設(shè)我們有兩個(gè)微服務(wù),一個(gè)是Catalog微服務(wù),一個(gè)是Basket微服務(wù),當(dāng)Catalog微服務(wù)產(chǎn)生了Product價(jià)格更新的事件,就會(huì)將其發(fā)布到Kafka,Basket微服務(wù)作為消費(fèi)者就會(huì)訂閱這個(gè)消息然后更新購(gòu)物車(chē)中對(duì)應(yīng)商品的最新價(jià)格。
Catalog API
新建一個(gè)ASP.NET Core WebAPI項(xiàng)目,然后分別安裝以下組件:
PM>Install Package DotNetCore.CAP PM>Install Package DotNetCore.CAP.MongoDB PM>Install Package DotNetCore.CAP.Kafka在Startup中的ConfigureServices方法中注入CAP:
public void ConfigureServices(IServiceCollection services) {......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");}); }新建一個(gè)ProductController,實(shí)現(xiàn)一個(gè)Update產(chǎn)品價(jià)格的接口,在其中通過(guò)CapPublisher完成發(fā)布消息到Kafka:
namespace EDT.Demo.Catalog.API.Controllers {[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "電動(dòng)牙刷A", Price = 99.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0002", Name = "電動(dòng)牙刷B", Price = 199.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0003", Name = "洗衣機(jī)A", Price = 2999.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0004", Name = "洗衣機(jī)B", Price = 3999.90M, Introduction = "暫無(wú)介紹" },new Product { Id = "0005", Name = "電視機(jī)A", Price = 1899.90M, Introduction = "暫無(wú)介紹" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 業(yè)務(wù)代碼var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 發(fā)布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}} }Basket API
參照Catalog API項(xiàng)目創(chuàng)建ASP.NET Core WebAPI項(xiàng)目,并安裝對(duì)應(yīng)組件,在ConfigureServices方法中注入CAP。
新建一個(gè)BasketController,用于訂閱Kafka對(duì)應(yīng)Topic:ProductPriceChanged 的消息。
namespace EDT.Demo.Basket.API.Controllers {[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "電動(dòng)牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "電視機(jī)A", Price = 1899.90M }, Count = 1 },} },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "電動(dòng)牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣機(jī)B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}} }測(cè)試效果
同時(shí)啟動(dòng)Catalog API 和 Basket API兩個(gè)項(xiàng)目。
首先,通過(guò)Swagger在Basket API中查看所有用戶購(gòu)物車(chē)中的商品的價(jià)格,可以看到,0002的商品是199.9元。
然后,通過(guò)Swagger在Catalog API中更新Id為0002的商品的價(jià)格至499.9元。
最后,通過(guò)Swagger在Basket API中查看所有用戶購(gòu)物車(chē)中的商品的價(jià)格,可以看到,0002的商品已更新至499.9元。
End總結(jié)
本文總結(jié)了.NET Core如何通過(guò)對(duì)應(yīng)客戶端操作Kafka,基于Confluent.Kafka項(xiàng)目和CAP項(xiàng)目可以方便的實(shí)現(xiàn)發(fā)布訂閱的效果。
參考資料
阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741
麥比烏斯皇,《.NET使用Kafka小結(jié)》:https://www.cnblogs.com/hsxian/p/12907542.html
Tony,《.NET Core事件總線解決方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html
極客時(shí)間,胡夕《Kafka核心技術(shù)與實(shí)戰(zhàn)》
B站,尚硅谷《Kafka 3.x入門(mén)到精通教程》
年終總結(jié):Edison的2020年終總結(jié)
數(shù)字化轉(zhuǎn)型:我在傳統(tǒng)企業(yè)做數(shù)字化轉(zhuǎn)型
C#刷題:C#刷劍指Offer算法題系列文章目錄
.NET面試:.NET開(kāi)發(fā)面試知識(shí)體系
.NET大會(huì):2020年中國(guó).NET開(kāi)發(fā)者大會(huì)PDF資料
總結(jié)
以上是生活随笔為你收集整理的Kafka学习征途:.NET Core操作Kafka的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微机原理与接口技术(一)
- 下一篇: ASP.NET网站建设基本常用代码[转载