kafka使用_Kafka介绍与使用
最近在研究kafka,覺得需要輸出點東西才能更好的吸收,遂總結與大家分享,話不多說。
一、先上思維導圖:
二、再上kafka整體架構圖:
2.1、Producer:消息生產者,就是向kafka broker發消息的客戶端。
2.2、Consumer :消息消費者,向kafka broker取消息的客戶端
2.3、Topic :每條發布到kafka集群的消息都有一個類別,這個類別被稱為主題Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)。
2.4、Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會復制(不是真的復制,是概念上的)到所有的CG,但每個partition只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic。
2.5、Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。
2.6、Partition:為了實現擴展性,一個非常大的topic可以分布到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序。
2.7、Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。
三、部分小點請看導圖
四、Kafka集群部署 (提前備好ZK集群環境)
4.1、下載安裝包
http://kafka.apache.org/downloads 或者在linux中使用wget命令下載安裝包 wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz |
4.2、解壓安裝包
| tar -zxvf /root/mysoftpackage/kafka_2.13-2.5.0.tgz -C /root/apps/ |
4.3、創建軟鏈接
如后續配置環境變量后,升級版本啥的不用再重新配置環境變量。 cd /root/apps/ ln -s kafka_2.13-2.5.0 kafka |
4.4、修改配置文件
cp /root/apps/kafka/config/server.properties?
/root/apps/kafka/config/server.properties.bak
vi /root/apps/kafka/config/server.properties
修改以下配置:
# Broker的全局唯一編號,集群內不重復即可 broker.id=0 #kafka運行日志存放的路徑 log.dirs=/root/kafkadata/logs #kafka依賴的ZK集群 zookeeper.connect=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181 |
vi /root/apps/kafka/config/producer.properties
修改以下配置:
| bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092 |
vi /root/apps/kafka/config/consumer.properties
修改以下配置:
| bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092 |
4.5、分發安裝包
scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-02:/root/apps scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-03:/root/apps 然后分別在各機器上創建軟連 cd /root/apps/ ln -s kafka_2.13-2.5.0 kafka |
4.6、再次修改配置文件(重要)
| 依次修改各服務器上配置文件的的broker.id,分別是0,1,2不得重復。 |
4.7、環境變量配置
vi /etc/profile export KAFKA_HOME=/root/apps/kafka export PATH=$PATH:$KAFKA_HOME/bin 刷新下系統環境變量 source /etc/profile |
4.8、守護進程啟動集群
依次在各節點上啟動kafka kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties |
4.9、編寫腳本批量啟動集群kafka服務(kafkaBatchStart.sh)
| #!/bin/bash for i in 1 2 3 do ssh hdp-node-0$i "source /etc/profile;/root/apps/kafka/bin/kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties" done |
五、基本管理操作Shell命令
5.1、查看當前服務器中的所有topic
kafka-topics.sh --list --zookeeper?hdp-node-01:2181 |
5.2、創建topic
replication-facto:副本數、partition:分區數 kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 3 --partitions 3 --topic goodsMq |
5.3、刪除topic
| kafka-topics.sh --delete --zookeeper hdp-node-01:2181 --topic goodsMq |
5.4、通過shell命令發送消息
kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic goodsMq 或 kafka-console-producer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq |
5.5、通過shell消費消息
--from-beginning:指定偏移量從頭開始消費 kafka-console-consumer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq --from-beginning |
5.6、查看某個Topic的詳情
kafka-topics.sh --topic goodsMq --describe --zookeeper hdp-node-01:2181,hdp-node-02:2181 |
六、Java簡單代碼示例
6.1、引入pom依賴
| <dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.5.0version>dependency |
6.2、消息生產者
public static void main(String[] args) {???//指定當前kafka?producer生產的數據的目的地???String?topicName?=?"orderMq"; // 讀取配置文件 Properties props = new Properties(); //指定kafka服務器地址 如果是集群可以指定多個 但是就算只指定一個他也會去集群環境下尋找其他的節點地址 props.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092"); //key序列化器 props.setProperty("key.serializer", StringSerializer.class.getName()); //value序列化器 props.setProperty("value.serializer",StringSerializer.class.getName()); //通過配置文件,創建生產者 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); //生產數據 for (int messageNo = 1; messageNo < 100; messageNo++) { //調用producer的send方法發送數據 ProducerRecord record = new ProducerRecord<String, String>(topicName, messageNo + "", "appid-" + UUID.randomUUID() + "-測試"); //發送記錄 producer.send(record); } producer.close(); System.out.println("done!!!");}6.3、消息消費者
public static void main(String[] args) throws Exception{ Properties properties = new Properties(); properties.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092"); properties.setProperty("key.deserializer", StringDeserializer.class.getName()); properties.setProperty("value.deserializer",StringDeserializer.class.getName()); properties.setProperty("group.id","test-consumer-group"); KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Collections.singletonList("orderMq")); while (true){ ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : poll) { System.out.println(record.key() + "=" + record.value()); } }}七、思考
7.1、Kafka為什么效率高吞吐量大
1)、硬盤的索引功能,二分查找法。 分區:找到相應的leader分區負責讀寫操作; 分段:根據文件segment的命名可以確認要查找的offset或timestamp在哪個文件中; 稀疏索引:快速確定要找的offset在哪個內存地址的附近。 2)、通過Partition實現并行處理 3)、I/O優化: 3.1)、磁盤的順序寫入(600MB/S) 3.2)、充分利用操作系統文件讀取緩存(PageCache) ? ? ? ?Kafka 的數據并不是實時的寫入硬盤,它充分利用了現代操作系統分頁存儲來利用內存提高 I/O 效率。再通過mmap(Memory Mapped Files)內存映射文件零拷貝的方式,它的工作原理是直接利用操作系統的 Page 來實現文件到物理內存的直接映射,完成映射之后你對物理內存的操作會被同步到硬盤上(操作系統在適當的時候)。通過 mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存),也不必關心內存的大小,有虛擬內存為我們兜底。 ? ? ? ?使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內核空間復制的開銷。但也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數據并沒有被真正的寫到硬盤,操作系統會在程序主動調用 Flush 的時候才把數據真正的寫到硬盤。 ? ? ? ?Kafka 提供了一個參數 producer.type 來控制是不是主動 Flush: 如果Kafka 寫入到 mmap 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。如果 Kafka 寫入 mmap 之后立即返回 Producer 不調用 Flush 叫異步 (Async)。 3.3.)、基于 Sendfile 實現零拷貝(Zero Copy)方式讀取磁盤數據 傳統模式下,當需要對一個文件進行傳輸的時候,其具體流程細節如下: a、調用 Read 函數,文件數據被 Copy 到內核緩沖區。 b、Read 函數返回,文件數據從內核緩沖區 Copy 到用戶緩沖區 c、Write 函數調用,將文件數據從用戶緩沖區 Copy 到內核與 Socket 相關的緩沖區。 d、數據從 Socket 緩沖區 Copy 到相關協議引擎。 以上細節是傳統 Read/Write 方式進行網絡文件傳輸的方式,我們可以看到,在這個過程當中,文件數據實際上是經過了四次 Copy 操作: 硬盤—>內核 buf—>用戶 buf—>Socket 相關緩沖區—>協議引擎
3.4)、批量壓縮減少網絡IO損耗 ? ? ? 在很多情況下,系統的瓶頸不是 CPU 或磁盤,而是網絡 IO,對于需要在廣域網上的數據中心之間發送消息的數據流水線尤其如此。進行數據壓縮會消耗少量的 CPU 資源,不過對于 Kafka 而言,網絡 IO 更應該考慮:因為每個消息都壓縮,但是壓縮率相對很低,所以 Kafka 使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。 ? ? ? ?Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費者解壓縮。 ? ? ? ?kafka在壓縮數據時使用的壓縮算法,可選參數有:none、gzip、snappy。none即不壓縮,gzip,和snappy壓縮算法之間取舍的話gzip壓縮率比較高,系統cpu占用比較大,但是帶來的好處是,網絡帶寬占用少,snappy壓縮比沒有gzip高,cpu占用率不是很高,性能也還行,如果網絡帶寬比較緊張的話。可以選擇gzip,一般推薦snappy。 |
7.2、數據生產時的分發策略是什么
Producer客戶端負責消息的分發。 ? ? ? kafka集群中的任何一個broker都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表、partitions、leader列表"等信息; ? ? ?當producer獲取到metadata信息之后, producer將會和Topic下所有partition leader保持socket連接; ? ? ?消息由producer直接通過socket發送到broker,中間不會經過任何"路由層",事實上,消息被路由到哪個partition上由producer客戶端決定; ? ? ?比如可以采用"random"、"key-hash""輪詢"等,如果一個topic中有多個partitions,那么在producer端實現"消息均衡分發"是必要的。 ? ? ?在producer端的配置文件中,開發者可以指定partition路由的方式。 |
7.3、如何保證數據不丟失完全生產
Producer消息發送的應答機制。 設置發送數據是否需要服務端的反饋,有三個值0,1,all 0: producer不會等待broker發送ack? 1: 當leader接收到消息之后發送ack? all: 當所有的follower都同步消息成功后發送ack request.required.acks=0 |
7.4、Partition如何分布在不同的Broker上
//第i個partition int i = 0; //broker列表 list{ broker01, broker02, broker03} for(int i=0;i<5;i++){ brIndex = I % list.size; ? ? ? ?//第i個partition分布在hostName上 hostName = list.get(brIndex) } |
7.5、Broker如何保存數據其文件存儲機制是什么
1)、Kafka文件存儲基本結構 ? ? ? ?在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。 ? ? ? ?每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。 ? ? ? ?默認保留7天的數據。 ? ? ? 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。(什么時候創建,什么時候刪除)
? ? ?Segment file組成:由2大部分組成,分別為index file和data file,這兩個文件一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引文件、數據文件。 ? ? ? ?Segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。 ? ? ? 索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。 ? ? ? ?其中以索引文件中元數據3,497為例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址為497。 3)、Kafka 查找message 讀取offset=368776的message,需要通過下面2個步驟查找。 第一步:查找segment file 00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0。 00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1。 00000000000000737337.index的起始偏移量為737338=737337 + 1 其他后續文件依次類推。 以起始偏移量命名并排序這些文件,只要根據offset **二分查找**文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index和對應log文件。 第二步:通過segment file查找message 當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址 然后再通過00000000000000368769.log順序查找直到offset=368776為止。 |
7.6、消費者如何標記消費狀態
通過偏移量來標識。 擴展偏移量與偏移量提交: ? ? ? 偏移量是一個自增長的ID,用來標識當前分區的哪些消息被消費過了,這個ID會保存在kafka的broker當中,而且消費者本地也會存儲一份。 ? ? ? ?因為每次消費每一條消息都要更新一下偏移量的話,難免會影響整個broker的吞吐量,所以一般這個偏移量在每次發生改動時,先由消費者本地改動,默認情況下,消費者每5秒鐘會提交一次改動的偏移量,這樣做雖然說吞吐量上來了,但是可能會出現重復消費的問題:? ? ? ? ?因為可能在下一次提交偏移量之前,消費者本地消費了一些消息,然后發生了分區再均衡(分區再均衡在下面有講) 那么就會出現一個問題。 ? ? ? ?假設上次提交的偏移量是 2000 在下一次提交之前,其實消費者又消費了500條數據,也就是說當前的偏移量應該是2500 但是這個2500只在消費者本地,也就是說,假設其他消費者去消費這個分區的時候拿到的偏移量是2000,那么又會從2000開始消費消息,那么2000到2500之間的消息又會被消費一遍,這就是重復消費的問題。 ? ? ??kafka對于這種問題也提供了解決方案:手動提交偏移量 可以關閉默認的自動提交(enable.auto.commit= false) 然后使用kafka提供的API來進行偏移量提交,kafka提供了兩種方式提交偏移量 :同步和異步 //同步提交偏移量kafkaConsumer.commitSync();//異步提交偏移量kafkaConsumer.commitAsync();? ? ? ?他們之間的區別在于,同步提交偏移量會等待服務器應答,并且遇到錯誤會嘗試重試,但是會一定程度上影響性能不過能確保偏移量到底提交成功與否;而異步提交的對于性能肯定是有提示的,但是弊端也就像我們剛剛所提到遇到錯誤沒辦法重試,因為可能在收到你這個結果的時候又提交過偏移量了,如果這時候重試,又會導致消息重復的問題了。 ? ? ? ? 其實,我們可以采用同步+異步的方式來保證提交的正確性以及服務器的性能。因為異步提交的話,如果出現問題但不是致命問題的話,可能下一次提交就不會出現這個問題了,所以有些異常是不需要解決的(可能單純的就是網絡抽風了呢? ) 所以,我們平時可以采用異步提交的方式,等到消費者中斷了(遇到了致命問題,或是強制中斷消費者) 的時候再使用同步提交(因為這次如果失敗了就沒有下次了,所以要讓他重試) 。 具體代碼: |
| ? ? ? 值得一提的是,在手動提交時kafka提供了你可以傳入具體的偏移量來完成提交,也就是指定偏移量提交,但是非常不建議手動指定,因為如果指定的偏移量小于分區所存儲的偏移量大小的話,那么會導致消息重復消費,如果指定的偏移量大于分區所存儲的偏移量的話,那么會導致消息丟失 |
7.7.消費者的分區再均衡及負載均衡策略是什么
分區再均衡也是kafka里面非常重要的一個概念。 首先操作在以下情況下會觸發分區再均衡(Rebalance)操作: a、組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了); b、訂閱主題數發生變更,如果你使用了正則表達式的方式進行訂閱,那么新建匹配正則表達式的topic就會觸發rebalance; c、訂閱主題的分區數發生變更。 當觸發Rebalance,kafka重新分配分區所有權 ? ? ?何為分區所有權?我們之前有提到過,消費者有一個消費者組的概念, 而且一個消費者組在消費一個主題時有以下規則,一個消費者可以消費多個分區,但是一個分區只能被一個消費者消費。如果我有分區 0、1、2 現在有消費者 A,B 那么 kafka可能會讓消費者A 消費 0,1 這兩個分區,那么 這時候我們就會說,消費者A 擁有分區 0、1的所有權。 ? ? ? 當觸發 Rebalance 的時候kafka會重新分配這個所有權,還是基于剛剛的比方,消費者A 擁有 0 和1 的所有權,消費者B 會有2的所有權。當消費者B離開kafka的時候 這時候 kafka會重新分配一下所有權,此時整個消費者組只有一個A 那么 0、1、2 三個分區的所有權都會屬于A ,同理如果這時候有消費者C進入這個消費者組,那么這時候kafka會確保每一個消費者都能消費一個分區。 ? ? ? ?當觸發Rebalance時,由于kafka正在分配所有權,會導致消費者不能消費,而且還會引發一個重復消費的問題, 當消費者還沒來得及提交偏移量時,分區所有權遭到了重新分配,那么這時候就會導致一個消息被多個消費者重復消費。 ? ? ? ?那么解決方案就是在消費者訂閱時,添加一個再均衡監聽器,也就是當kafka在做Rebalance操作前后,均會調用再均衡監聽器,那么這時候 我們可以在kafka Rebalance之前提交我們消費者最后處理的消息來解決這個問題。 拓展、Close(): ? ? ? 當我們不需要某個消費者繼續消費kafka當中的數據時,我們可以選擇調用Close方法來關閉它,在關閉之前 close方法會發送一個通知告訴kafka我這個消費者要退出了,那么 kafka就會準備Rebalance 而且如果是采用的自動提交偏移量,消費者自身也會在關閉自己之前提交最后所消費的偏移量 。當然即使沒有調用close方法,而是直接強制中斷了消費者的進程 kafka也會根據我們后面會說到的系統參數捕捉到消費者退出了。 |
7.8.如何保證消費者消費的數據有序
? ? ? 只能保證同一個分區下的數據是有序的,可以讓同一類的數據進入到同一個分區里。 ? ? ? 若想保證同一個主題的數據被消費時的順序和生產時的順序一致,那么只能設置一個分區。 |
總結
以上是生活随笔為你收集整理的kafka使用_Kafka介绍与使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: css 动态生成圆形区域内扇形个数_CS
- 下一篇: 移动短信回执怎么开通_移动短信回执业务内