.Net Core 集成 Kafka
最近維護(hù)的一個(gè)系統(tǒng)并發(fā)有點(diǎn)高,所以想引入一個(gè)消息隊(duì)列來進(jìn)行削峰。考察了一些產(chǎn)品,最終決定使用kafka來當(dāng)做消息隊(duì)列。以下是關(guān)于kafka的一些知識(shí)的整理筆記。
kafka
kafka 是分布式流式平臺(tái)。它由linkedin開發(fā),后貢獻(xiàn)給了Apache開源組織并成為頂級(jí)開源項(xiàng)目。它可以應(yīng)用在高并發(fā)場(chǎng)景下的日志系統(tǒng),也可以當(dāng)作消息隊(duì)列來使用,也可以當(dāng)作消息服務(wù)對(duì)系統(tǒng)進(jìn)行解耦。
流處理平臺(tái)有以下三種特性:
1.可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊(duì)列或者企業(yè)消息系統(tǒng)類似。2.可以儲(chǔ)存流式的記錄,并且有較好的容錯(cuò)性。3.可以在流式記錄產(chǎn)生時(shí)就進(jìn)行處理。
一般它可以應(yīng)用于兩個(gè)場(chǎng)景:
1.構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。(相當(dāng)于message queue)2.構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。(就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)
broker
kafka中的每個(gè)節(jié)點(diǎn)即每個(gè)服務(wù)器就是一個(gè)broker 。
topic
kafka中的topic是一個(gè)分類的概念,表示一類消息。生產(chǎn)者在生產(chǎn)消息的時(shí)候需要指定topic,消費(fèi)者在消費(fèi)消息的時(shí)候也需要指定topic。
partition
partition是分區(qū)的概念。kafka的一個(gè)topic可以有多個(gè)partition。每個(gè)partition會(huì)分散到不同的broker上,起到負(fù)載均衡的作用。生產(chǎn)者的消息會(huì)通過算法均勻的分散在各個(gè)partition上。
consumer group
kafka的消費(fèi)者有個(gè)組的概念。一個(gè)partition可以被多consumer group訂閱。每個(gè)消息會(huì)廣播到每一個(gè)group中。但是每個(gè)消息只會(huì)被group中的一個(gè)consumer消費(fèi)。相當(dāng)于每個(gè)group,一個(gè)partition只能有一個(gè)consumer訂閱,所以group中的consumer數(shù)量不可以超過topic中partition的數(shù)量。并且消息的消費(fèi)的順序在每個(gè)partition中是保證有序的,但是在多個(gè)partition之間是不保證的,因?yàn)閏onsumer的消費(fèi)速度是有快慢的。
所以如果要用kafka實(shí)現(xiàn)嚴(yán)格的消息隊(duì)列點(diǎn)對(duì)點(diǎn)模式那么我們可以設(shè)置一個(gè)partition并且設(shè)置一個(gè)consumer。如果對(duì)消息消費(fèi)的順序不是那么敏感,那么可以設(shè)置多個(gè)partition來并行消費(fèi)消息,提高吞吐量。
安裝kafka
為了能體驗(yàn)下kafka,我們還是要實(shí)際安裝一下kafka,畢竟空想是沒有用的。現(xiàn)在有了docker,安裝起來也是相當(dāng)?shù)魏唵巍N覀冎恍枰x好docker-compose的yml就行了。
version: '3' services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117KAFKA_CREATE_TOPICS: "test:3:1"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181我們?cè)趛ml里定義2個(gè)service:
1.zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它。2.kafka ,kafka的定義有幾個(gè)地方要注意的。
?depends_on:zookeeper 指定kafka依賴zookeeper這個(gè)service,當(dāng)啟動(dòng)kafka的時(shí)候自動(dòng)會(huì)啟動(dòng)zookeeper。?KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機(jī)的ip?KAFKA_CREATE_TOPICS 這個(gè)變量只是的默認(rèn)創(chuàng)建的topic。"test:3:1"代表創(chuàng)建一個(gè)名為test的topic并且創(chuàng)建3個(gè)分區(qū)1個(gè)復(fù)制。
定義好這些之后我們只需要使用docker-compose命令運(yùn)行它:
sudo docker-compose up -d.net 操作 kafka
安裝好kafka的docker環(huán)境之后,下面演示下如何使用.net操作kafka,進(jìn)行消息的生產(chǎn)與消費(fèi)。
生產(chǎn)者
static async Task Main(string[] args){Console.WriteLine("Hello World Producer!");var config = new ProducerConfig{BootstrapServers = "192.168.0.117:9092",ClientId = Dns.GetHostName(),};using (var producer = new ProducerBuilder<Null, string>(config).Build()){string topic = "test";for (int i = 0; i < 100; i++){var msg = "message " + i;Console.WriteLine($"Send message: value {msg}");var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");Thread.Sleep(500);}}Console.ReadLine();}新建一個(gè)控制臺(tái)項(xiàng)目,從nuget安裝kafka的官方client。
Install-Package Confluent.Kafka代碼非常簡單,使用ProducerBuilder構(gòu)造一個(gè)producer,然后調(diào)用ProduceAsync方法發(fā)送消息。
其中需要注意的是如果你的場(chǎng)景并發(fā)非常之高,官方文檔推薦的方法是Produce而不是ProduceAsync。這是一個(gè)比較迷的地方。按常理使用ProduceAsync應(yīng)該比使用同步方法Produce能獲得更高的并發(fā)才對(duì)。但是文檔確確實(shí)實(shí)說高并發(fā)場(chǎng)景請(qǐng)使用Produce。可能是為了避免ProduceAsync結(jié)果返回的時(shí)候異步線程上下文切換造成的性能開銷。
原文:
消費(fèi)者
static void Main(string[] args){Console.WriteLine("Hello World kafka consumer !");var config = new ConsumerConfig{BootstrapServers = "192.168.0.117:9092",GroupId = "foo",AutoOffsetReset = AutoOffsetReset.Earliest};var cancel = false;using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){var topic = "test";consumer.Subscribe(topic);while (!cancel){var consumeResult = consumer.Consume(CancellationToken.None);Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");}consumer.Close();}}消費(fèi)者的演示代碼同樣很簡單。我們需要指定groupId,然后訂閱topic。使用ConsumerBuilder構(gòu)造一個(gè)consumer,然后調(diào)用Consume方法進(jìn)行消費(fèi)就可以。
注意:
這里默認(rèn)是自動(dòng)commit消費(fèi)。你也可以根據(jù)情況手動(dòng)提交commit。
運(yùn)行一下
我們運(yùn)行一個(gè)生產(chǎn)者進(jìn)程,按照500ms的速度生產(chǎn)消息。運(yùn)行三個(gè)consumer進(jìn)行消費(fèi),可以看到消息被均勻的推送到三個(gè)consumer上去。
總結(jié)
以上簡單的介紹了kafka的背景、安裝方法、使用場(chǎng)景。還簡單演示了如何使用.net來操作kafka。它可以當(dāng)作流式計(jì)算平臺(tái)來使用,也可以當(dāng)作傳統(tǒng)的消息隊(duì)列使用。它當(dāng)前非常流行,網(wǎng)上的資料也多如牛毛。官方也提供了簡單易用的.net sdk ,為.net 平臺(tái)集成kafka提供了便利。
關(guān)注我的公眾號(hào)一起玩轉(zhuǎn)技術(shù)
總結(jié)
以上是生活随笔為你收集整理的.Net Core 集成 Kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C# 离线使用nuget
- 下一篇: asp.net ajax控件工具集 Au