「中间件系列一」kafka消息中间件
kafka消息中間件
一:為什么需要消息中間件?
1.解耦:允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.冗余:
消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3.擴展性:
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4.靈活性 & 峰值處理能力:
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
5.可恢復性:
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
6.順序保證:
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。(Kafka 保證一個 Partition 內的消息的有序性)
7.緩沖:
有助于控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。
8.異步通信:
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
Kafka的吞吐量幾乎是行業里最優秀的,在常規的機器配置下,一臺機器可以達到每秒十幾萬的QPS,可謂是消息中間件中的頂尖水平。所以一般選型我們都不用其他的,用kafka做消息中間件就夠了。(當然如果你topic特別多,并發特別高,阿里的RocketMQ更合適,后面找時間寫一篇關于兩者的區別。kafka偏吞吐量高的應用,因為其采用的socket長連接方式,如果消費者特別多其實會有空轉現象。有人說這個時候擴容啊?但是擴了很多機器都沒用是為什么?因為partition沒有變多,其實這個時候需要擴大都是partition的數量。好具體我們后面解釋)
kafaka官網:
https://kafka.apache.org書簽:Apache Kafka
Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用于web/nginx日志、訪問日志,消息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源項目。
主要應用場景是:日志收集系統和消息系統(最為消息又有狀態通知或者是數據傳輸的功能。但是盡量用第一種,不要設計太大的消息,不然存在堵死網絡的風險。包括redis的key設計也一樣,不要設計超過100kb大小的內容)。
Kafka主要設計目標如下:
以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。
高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
支持Kafka Server間的消息分區,及分布式消費,同時保證每個partition內的消息順序傳輸。
同時支持離線數據處理和實時數據處理。
二、kafka 架構
?
相關概念:
1.producer:
消息生產者,發布消息到 kafka 集群的終端或服務。
2.broker:
kafka 集群中包含的服務器。
3.topic:
每條發布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的。
4.partition:
partition 是物理上的概念,每個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
5.consumer:
從 kafka 集群中消費消息的終端或服務,每個consumer都會分配一個partition去消費,除非有consumer發生了改變,kafka會根據算法去重新負載分配。(注:這里可能會產生負載風暴(rebalance風暴),所以不要連接太多消費者或者說要控制負載的頻率)
?
6.Consumer group:
high-level consumer API 中,每個 consumer 都屬于一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費(單播),但可以被多個 consumer group 消費(多播)。
7.replica:
partition 的副本,保障 partition 的高可用。
8.leader:
replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
replica 中的一個角色,從 leader 中復制數據。
10.controller:
kafka 集群中的其中一個服務器,用來進行 leader election 以及 各種 failover。
12.zookeeper:
kafka 通過 zookeeper 來存儲集群的 meta 信息。
三、producer 發布消息
3.1 寫入方式
producer 采用 push 模式將消息發布到 broker,每條消息都被 append 到 patition 中,屬于連續存儲,連續寫到磁盤的同一個區域,而且消息記錄不會被刪除。
因為硬盤是機械結構,每次讀寫都會尋址,寫入,其中尋址是一個“機械動作”,它是最耗時的。所以硬盤最“討厭”隨機I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
每條消息都被append到該Partition中,屬于順序寫磁盤,因此效率非常高。但是,有人把kafka當作數據庫用,這是不理智的。因為每次查數據都要去scan這個連續存儲區。
?
對于傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka是不會刪除數據的,它會把所有的數據都保留下來,每個消費者(Consumer)對每個Topic都有一個offset用來表示讀取到了第幾條數據。
?
即便是順序寫入硬盤,硬盤的訪問速度還是不可能追上內存。所以Kafka的數據并不是實時的寫入硬盤,它充分利用了現代操作系統分頁存儲來利用內存提高I/O效率。
在Linux Kernal 2.2之后出現了一種叫做“零拷貝(zero-copy)”系統調用機制,就是跳過“用戶緩沖區”的拷貝,建立一個磁盤空間和內存空間的直接映射,數據不再復制到“用戶態緩沖區”系統上下文切換減少2次,可以提升一倍性能。(redis的rdb方式也與這個類似)
?
通過mmap,進程像讀寫硬盤一樣讀寫內存(當然是虛擬機內存)。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內核空間復制的開銷(調用文件的read會把數據先放到內核空間的內存中,然后再復制到用戶空間的內存中。)
?
消費者(讀取數據)
試想一下,一個Web Server傳送一個靜態文件,如何優化?答案是zero copy。傳統模式下我們從硬盤讀取一個文件是這樣的。
?
先復制到內核空間(read是系統調用,放到了DMA,所以用內核空間),然后復制到用戶空間(1、2);從用戶空間重新復制到內核空間(你用的socket是系統調用,所以它也有自己的內核空間),最后發送給網卡(3、4)。
?
Zero Copy中直接從內核空間(DMA的)到內核空間(Socket的),然后發送網卡。這個技術非常普遍,Nginx也是用的這種技術。
實際上,Kafka把所有的消息都存放在一個一個的文件中,當消費者需要數據的時候Kafka直接把“文件”發送給消費者。當不需要把整個文件發出去的時候,Kafka通過調用Zero Copy的sendfile這個函數,這個函數包括:
out_fd作為輸出(一般及時socket的句柄)
in_fd作為輸入文件句柄
off_t表示in_fd的偏移(從哪里開始讀取)
size_t表示讀取多少個
3.2 消息路由
producer 發送消息到 broker 時,會根據分區算法選擇將其存儲到哪一個 partition。其路由機制為:
指定了 patition,則直接使用;
未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
patition 和 key 都未指定,使用輪詢選出一個 patition。
附上 java 客戶端分區源碼,一目了然:
//創建消息實例
public?ProducerRecord(String?topic,?Integer?partition,?Long?timestamp,?K?key,?V?value)?{
?????if?(topic ==?null)
??????????throw?new?IllegalArgumentException("Topic cannot be null");
?????if?(timestamp !=?null?&&?timestamp <?0)
??????????throw?new?IllegalArgumentException("Invalid timestamp "?+?timestamp);
?????this.topic =?topic;
?????this.partition =?partition;
?????this.key =?key;
?????this.value =?value;
?????this.timestamp =?timestamp;
}
//計算 patition,如果指定了 patition 則直接使用,否則使用 key 計算
private?int?partition(ProducerRecord<K,?V>?record,?byte[]?serializedKey ,?byte[]?serializedValue,?Cluster?cluster)?{
?????Integer?partition =?record.partition();
?????if?(partition !=?null)?{
??????????List<PartitionInfo>?partitions =?cluster.partitionsForTopic(record.topic());
??????????int?lastPartition =?partitions.size()?-?1;
??????????if?(partition <?0?||?partition >?lastPartition)?{
???????????????throw?new?IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].",?partition,?lastPartition));
??????????}
??????????return?partition;
?????}
?????return?this.partitioner.partition(record.topic(),?record.key(),?serializedKey,?record.value(),?serializedValue,?cluster);
}
// 使用 key 選取 patition
public?int?partition(String?topic,?Object?key,?byte[]?keyBytes,?Object?value,?byte[]?valueBytes,?Cluster?cluster)?{
?????List<PartitionInfo>?partitions =?cluster.partitionsForTopic(topic);
?????int?numPartitions =?partitions.size();
?????if?(keyBytes ==?null)?{
??????????int?nextValue =?counter.getAndIncrement();
??????????List<PartitionInfo>?availablePartitions =?cluster.availablePartitionsForTopic(topic);
??????????if?(availablePartitions.size()?>?0)?{
???????????????int?part =?DefaultPartitioner.toPositive(nextValue)?%?availablePartitions.size();
???????????????return?availablePartitions.get(part).partition();
??????????}?else?{
???????????????return?DefaultPartitioner.toPositive(nextValue)?%?numPartitions;
??????????}
?????}?else?{
??????????//對 keyBytes 進行 hash 選出一個 patition
??????????return?DefaultPartitioner.toPositive(Utils.murmur2(keyBytes))?%?numPartitions;
?????}
?????}
Java
3.3 寫入流程
?producer 寫入消息序列圖如下所示:
?
producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
producer 將消息發送給該 leader
leader 將消息寫入本地 log
followers 從 leader pull 消息,寫入本地 log 后 leader 發送 ACK
leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向producer 發送 ACK
kafka發送者producer一般采用異步去發送,但是由于kafaka很快所以我們可以采用批量推送的形式去推,不要每次都去,不然應用的線程就會暴增,可能會導致應用掛掉。
3.4 消息的順序消費、消費失敗處理
多個patrtion,多個consumer其實很難直接保障消息的消費順序。
比如說我們建了一個 topic,有三個 partition。生產者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那么這個訂單相關的數據,一定會被分發到同一個 partition 中去,而且這個 partition 中的數據一定是有順序的。 ?
消費者從 partition 中取出來數據的時候,也一定是有順序的。到這里,順序還是 ok 的,沒有錯亂。接著,我們在消費者里可能會搞多個線程來并發處理消息。因為如果消費者是單線程消費處理,而處理比較耗時的話,比如處理一條消息耗時幾十 ms,那么 1 秒鐘只能處理幾十條消息,這吞吐量太低了。而多個線程并發跑的話,順序可能就亂掉了。
?
解決方案
一個 topic,一個 partition,一個 consumer,內部單線程消費,單線程吞吐量太低,一般不會用這個。
寫 N 個內存 queue,具有相同 key 的數據都到同一個內存 queue;然后對于 N 個線程,每個線程分別消費一個內存 queue 即可,這樣就能保證順序性。
?
Kafka數據傳輸的事務特點
at most once:最多一次,這個和JMS中"非持久化"消息類似,發送一次,無論成敗,將不會重發。消費者fetch消息,然后保存offset,然后處理消息;當client保存offset之后,但是在消息處理過程中出現了異常,導致部分消息未能繼續處理。那么此后"未處理"的消息將不能被fetch到,這就是"at most once"。
at least once:消息至少發送一次,如果消息未能接受成功,可能會重發,直到接收成功。消費者fetch消息,然后處理消息,然后保存offset。如果消息處理成功之后,但是在保存offset階段zookeeper異常導致保存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的消息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態。
exactly once:消息只會發送一次。kafka中并沒有嚴格的去實現(基于2階段提交),我們認為這種策略在kafka中是沒有必要的。
通常情況下"at-least-once"是我們首選。
盡管 Kafka 的跨邊界事件發布機制顯得相當優雅,但畢竟這是一個分布式系統,因此系統可能會有很多錯誤。我們將關注也許是最常見的惱人問題:消費者可能無法成功處理其消費的消息。
?
重試主題:流行的解決方案
消費者嘗試消費主要主題中的一條消息。
如果未能正確消費該消息,則消費者將消息發布到第一個重試主題,然后提交消息的偏移量,以便繼續處理下一條消息。
訂閱重試主題的是重試消費者,它包含與主消費者相同的邏輯。該消費者在消息消費嘗試之間引入了短暫的延遲。如果這個消費者也無法消費該消息,則會將該消息發布到另一個重試主題,并提交該消息的偏移量。
這一過程繼續,并增加了一些重試主題和重試消費者,每個重試的延遲越來越多(用作退避策略)。最后,在最終重試消費者無法處理某條消息后,該消息將發布到一個死信隊列(Dead Letter Queue,DLQ)中,工程團隊將在該隊列中對其進行手動分類。
?
當然這個模式仍然有些問題,必須會有消息的有序性和可恢復錯誤的情況:詳情可觀看這個文檔
https://blog.csdn.net/zhipengfang/article/details/117455550?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_title~default-1.fixedcolumn&spm=1001.2101.3001.4242.2書簽:兄弟!kafka的重試機制,你可能用錯了~_方志朋的博客-CSDN博客
3.5 ?Consumer 消費消息
consumer 采用 pull 模式從 broker 中讀取數據。
push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。
對于 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
consumer delivery guarantee
如果將 consumer 設置為 autocommit,consumer 一旦讀到數據立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。
但實際使用中應用程序并非在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee:
1.讀完消息先 commit 再處理消息。
這種模式下,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應于 At most once
2.讀完消息先處理再 commit。
這種模式下,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應于 At least once。
3.如果一定要做到 Exactly once,就需要協調 offset 和實際操作的輸出。
精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,consumer 拿到數據后可能把數據放到 HDFS,如果把最新的 offset 和數據本身一起寫到 HDFS,那就可以保證數據的輸出和 offset 的更新要么都完成,要么都不完成,間接實現 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,無法存于HDFS,而SimpleConsuemr API的 offset 是由自己去維護的,可以將之存于 HDFS 中)
總之,Kafka 默認保證 At least once,并且允許通過設置 producer 異步提交來實現 At most once(見文章《kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協作,幸運的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。
更多關于 kafka 傳輸語義的信息請參考《Message Delivery Semantics》。
?Consumer rebalance
當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法如下:
. 將目標 topic 下的所有 partirtion 排序,存于PT
. 對某 consumer group 下所有 consumer 排序,存于 CG,第 i 個consumer 記為 Ci
. N=size(PT)/size(CG),向上取整
. 解除 Ci 對原來分配的 partition 的消費權(i從0開始)
. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
在 0.8.*版本,每個 consumer 都只負責調整自己所消費的 partition,為了保證整個consumer group 的一致性,當一個 consumer 觸發了 rebalance 時,該 consumer group 內的其它所有其它 consumer 也應該同時觸發 rebalance。這會導致以下幾個問題:
1.Herd effect
任何 broker 或者 consumer 的增減都會觸發所有的 consumer 的 rebalance
2.Split Brain
每個 consumer 分別單獨通過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那么不同 consumer 在同一時刻從 zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會造成不正確的 reblance 嘗試。
調整結果不可控
所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,這可能會導致 kafka 工作在一個不正確的狀態。
基于以上問題,kafka 設計者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance,然后又從簡便性和驗證要求兩方面考慮,計劃在 consumer 客戶端實現分配方案。(見文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此處不再贅述。
部分參考:
kafka學習筆記:知識點整理_wr_java的博客-CSDN博客一、為什么需要消息系統1.解耦: 允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。2.冗余: 消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。3.擴展性: 因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。4.靈活性https://blog.csdn.net/wr_java/article/details/107939092?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522163862461916780271953494%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=163862461916780271953494&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_v2~rank_v29-1-107939092.pc_v2_rank_blog_default&utm_term=kafak&spm=1018.2226.3001.4450
總結
以上是生活随笔為你收集整理的「中间件系列一」kafka消息中间件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 后端监控架构体系
- 下一篇: 「中间件系列二」redis缓存