日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka技术内幕(二)

發布時間:2024/4/13 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka技术内幕(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka服務器

協議

時間輪

Kafka中存在大量的延時操作, 比如延時生產、延時拉取和延時刪除等。Kafka并沒有使用JDK自帶的Timer 或DelayQueue來實現延時的功能,而是基于時間輪的概念自定義實現了一個用于延時功能的定時器(SystemTimer)。JDK中Timer和DelayQueue的插入和刪除操作的平均時間復雜度為O(nlogn)并不能滿足Kafka的高性能要求, 而基于時間輪可以將插入和刪除操作的時間復雜度都降為0(1) 。時間輪的應用并非Kafka獨有, 其應用場景還有很多,在Netty、Akka、Quartz、ZooKeeper等組件中都存在時間輪的蹤影。

Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列, 底層采用數組實現, 數組中的每個元素可以存放一個定時任務列表(TimerTaskList)。TimerTaskList是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(TimerTaskEntry), 其中封裝了真正的定時任務(TimerTask)。

時間輪由多個時間格組成, 每個時間格代表當前時間輪的基本時間跨度(tickMs)。時間輪的時間格個數是固定的,可用wheelSize來表示, 那么整個時間輪的總體時間跨度(interval)可以通過公式tickMs x wheelSize計算得出。 時間輪還有 一個表盤指針(currentTime) , 用來表示時間輪當前所處的時間,currentTime 是tickMs的整數倍。currentTime可以將整個時間輪劃分為到期部分和未到期部分, currentTime當前指向的時間格也屬于到期部分, 表示剛好到期, 需要處理此時間格所對應的TimerTaskList中的所有任務。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xEsOPA3x-1617519680355)(C:\Users\lihj\AppData\Roaming\Typora\typora-user-images\image-20210206155940293.png)]

時間輪結構

整個時間輪的總體跨度是不變的, 隨著指針currenTtime的不斷推進, 當前時間輪所能處理的時間段也在不斷后移, 總體時間范圍在 currentT ime和 CurreentTime + intevral之間。新進任務的時間格為 currentTime + 延遲時間

對于超過 interval 的時間,引入了層級概念。每一層的wheelSize都是固定的(20),tickMs為上一層的interval

多層時間輪

注意:

1、TimingWheel 在創建的時候以當前系統時間為第一層時間輪的起始時間(startMs),這里的當前系統時間并沒有簡單地調用System.currentTimeMillis(), 而是調用了Time.SYSTEM.hiResClockMs, 這是因為currentTimeMillis()方法的時間精度依賴于操
作系統的具體實現, 有些操作系統下并不能達到毫秒級的精度, 而Time.SYSTEM.hiResC!ockMs實質上采用了System.nanoTime()/1_ 000 000來將精度調整到毫秒級。

2、TimingWheel中的每個雙向環形鏈表TimerTaskList都會有一個哨兵節點(sentinel),引入哨兵節點可以簡化邊界條件。

3、 除了第一層時間輪, 其余高層時間輪的起始時間(startMs)都設置為創建此層時間輪時前面第一輪的currentTime 。每一層的currentTime都必須是tickMs的整數倍, 如果不滿足則會將currentTime修剪為tickMs 的整數倍, 以此與時間輪中的時間格的到期時間范圍對應起來。修剪方法為: currentTime = startMs - (startMs % tickMs) 。currentTime會隨著時間推移而推進, 但不會改變為tickMs 的整數倍的既定事實。若某一時刻的時
間為timeMs, 那么此時時間輪的currentTime = timeMs - (timeMs % tickMs), 時間每推進一次, 每個層級的時間輪的currentTime都會依據此公式執行推進

4、 Kafka中的定時器只需持有TimingWheel 的第一層時間輪的引用, 并不會直接持有其他高層的時間輪, 但每一層時間輪都會有一個引用(overflowWheel)指向更高一層的應用, 以此層級調用可以實現定時器間接持有各個層級時間輪的引用。

Kafka中的定時器借了JDK中的DelayQueue來協助推進時間輪。具體做法是對于每個使用到的TimerTaskList都加入DelayQueue, “每個用到的TimerTaskList"特指非哨兵節點的定時任務項TimerTaskEntry對應的TimerTaskList 。DelayQueue 會根據TimerTaskList對應的超時時間expiration來排序,最短expiration 的TimerTaskList會被排在DelayQueue 的隊頭。Kafka中會有一個線程來獲取DelayQueue 中到期的任務列表, 這個線程所對應的名稱叫作”ExpiredOperationReaper", 可以直譯為“ 過期操作收割機” 。當“ 收割機” 線程獲取DelayQueue 中超時的任務列表TimerTaskList之后, 既可以根據TimerTaskList 的expiration來推進時間輪的時間, 也可以就獲取的TimerTaskList 執行相應的操作, 對里面的TimerTaskEntry該執行過期操作的就執行過期操作,該降級時間輪的就降級時間輪。

源碼解析參考:https://blog.csdn.net/demon7552003/article/details/92053615

延時操作

控制器

在Kafka 集群中會有一個或多個broker, 其中有一個broker 會被選舉為控制器(Kafka Control ler), 它負責管理整個集群中所有 分區 和副本的狀態。 當某個分區的leaeder副本出現故障時, 由控制器負責為該分區選舉新的leader副本。當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其元數據信息。當使用kafka-to pci s.sh腳本為某個tpoic增加分區數量時, 同樣還是由控制器負責分區的重新分配。

控制器的選舉及異常恢復

Kafka中的控制器選舉工作依賴于ZooKeeper, 成功競選為控制器的broker會在ZooKeeper中創建/controller這個臨時(EPHEMERAL)節點。

{"version": 1, "broker id": 0, "timestamp":"1529210278988"}

其中version在目前版本中固定為1, brokerid表示成為控制器的broker的id編號,timestamp表示競選成為控制器時的時間戳。

ZooKeeper 中還有一個與控制器有關的/controller_epoch節點, 這個節點是持久(PERSISTENT)節點, 節點中存放的是一個整型的controller _epoch值。用于記錄控制器發生變更的次數, 即記錄當前的控制器是第幾代控制器, 也可以稱之為“控制器的紀元” 。初始值為1, 即集群中第一個控制器的紀元為1, 當控制器發生變更時,每選出一個新的控制器就將該字段值加1。每個和控制器交互的請求都會攜帶controller_epoch這個字段,用于判斷請求是否有效。 Kafka通過controller_epoch來保證控制器的唯一性, 進而保證相關操作的一致性。

優雅關閉

使用 kill -s TERM $PIDS或 kill -15 $PIDS的方式來關閉進程, 注意千萬不要使用kill -9的方式。

分區leader的選舉

客戶端

分區分配策略

Kafka提供了消費者客戶端參數partition.assignment.strategy來設置消費者與訂閱主題之間的分區分配策略。默認情況下, 此參數的值為org.apache.kafka.clients. consumer.RangeAssignor, 即采用RangeAssignor分配策略。除此之外, Kafka還提供了另外兩種分配策略: RoundRobinAssignor和StickyAssignor。消費者客戶端參數partition.assignment.strategy可以配置多個分配策略, 彼此之間以逗號分隔。

RangeAssignor分配策略

RangeAssignor 分配策略的原理是按照消費者總數和分區總數進行整除運算來獲得一個跨度, 然后將分區按照跨度進行平均分配, 以保證分區盡可能均勻地分配給所有的消費者。對于每一個主題,RangeAssignor策略會將消費組內所有訂閱這個主題的消費者按照名稱的字典序排序,然后為每個消費者劃分固定的分區范圍,如果不夠平均分配,那么字典序靠前的消費者會被多分配一個分區。

缺點:如果訂閱了多個主題,則靠前的消費者,每個主題都多一個分區,多個主題加起來,可能就會多很多個分組。

RoundRobinAssignor分配策略

RoundRobinAssignor 分配策略的原理是將消費組內所有消費者及消費者訂閱的所有主題的分區按照字典序排序,然后通過輪詢方式逐個將分區依次分配給每個消費者。RoundRobinAssignor分配策略對應的partition.assignment.strategy參數值為org.apache.kafka.clients.consumer.RoundRobinAssignor。

如果同一個消費組內的消費者訂閱的信息是不相同的(可能是每個主題的分區個數不一致), 那么在執行分區分配的時候就不是完全的輪詢分配, 有可能導致分區分配得不均勻。

StickyAssignor分配策略

“sticky"這個單詞可以翻譯為“ 黏性的”, Kafka從0.11.x版本開始引入這種分配策略, 它主要有兩個目的:
(1)分區的分配要盡可能均勻。

(2)分區的分配盡可能與上次分配的保待相同。

當兩者發生沖突時, 第一個目標優先于第二個目標。鑒于這兩個目標, StickyAssignor分配略的具體實現要比RangeAssignor和BoundRobinAssignor這兩種分配策略要復雜得多。

自定義分區分配策略

可以自定義分配策略來實現更多可選的功能。自定義的分配策略必須要實現org.apache.kafka.clients.consumer.intemals.PartitionAssignor 接口。

PartitionAssignor接口中定義了兩個內部類:Subscription和Assignment。

Subscription類用來表示消費者的訂閱信息, 類中有兩個屬性:topics和userData,分別表示消費者的訂閱主題列表和用戶自定義信息。PartitionAssignor接口通過subscription()方法來設置消費者自身相關的Subscription信息。

Assignment類, 它用來表示分配結果信息, 類中也有兩個屬性: partitions和userData,分別表示所分配到的分區集合和用戶自定義的數據。

消費者協調器和組協調器

如果消費者客戶端中配置了兩個分配策略,那么以哪個為準呢?如果有多個消費者,彼此所配置的分配策略并不完全相同,那么以哪個為準?多個消費者之間的分區分配是需要協同的,那么這個協同的過程又是怎樣的呢?這一切都是交由消費者協調器( ConsumerCoordinator )和組協調器
(GroupCoordinator)來完成的,它們之間使用一套組協調協議進行交互。

舊版消費者客戶端的問題

zookeeper問題:

(1)羊群效應(Herd Effect) : 所謂的羊群效應是指ZooKeeper中一個被監聽的節點變化,大量的Watcher 通知被發送到客戶端,導致在通知期間的其他操作延遲, 也有可能發生類似死鎖的情況。

(2)腦裂問題(Split Brain) : 消費者進行再均衡操作時每個消費者都與ZooKeeper進行通信以判斷消費者或broker變化的情況,由于ZooKeeper本身的特性, 可能導致在同一時刻各個消費者獲取的狀態不一致,這樣會導致異常問題發生。

再均衡的原理

新版的消費者客戶端對此進行了重新設計,將全部消費組分成多個子集, 每個消費組的子集在服務端對應一個GroupCoordinator對其進行管理,GroupCoordinator是Kafka服務端中用于管理消費組的組件。而消費者客戶端中的ConsumerCoordinator組件負責與GroupCoordinator進行交互。

ConsumerCoordinator與GroupCoordinator之間最重要的職責就是負責執行消費者再均衡的操作,包括前面提及的分區分配的工作也是在再均衡期間完成的。就目前而言, 一共有如下幾種情形會觸發再均衡的操作:

? 有新的消費者加入消費組。

? 有消費者宕機下線。消費者并不一定需要真正下線, 例如遇到長時間的GC、網絡延遲導致消費者長時間未向GroupCoordinator發送心跳等情況時,GroupCoordinator會認為消費者已經下線。

? 有消費者主動退出消費組(發送LeaveGroupRequest 請求)。比如客戶端調用了unsubscrible()方法取消對某些主題的訂閱。

? 消費組所對應的GroupCoorinator節點發生了變更

? 消費組內所訂閱的任一主題或者主題的分區數量發生變化

當有消費者加入消費組時,消費者、消費組及組協調器之間會經歷一下幾個階段。

第一階段(FIND_COORDINATOR)

消費者需要確定它所屬的消費組對應的GroupCoordinator所在的broker,并創建與該broker相互通信的網絡連接。

第二階段(JOIN_GROUP)

在成功找到消費組所對應的GroupCoordinator 之后就進入加入消費組的階段,在此階段的消費者會向GroupCoordinator發送JoinGroupRequest請求,并處理響應。

第三階段( SYNC GROUP)

leader 消費者根據在第二階段中選舉出來的分區分配策略來實施具體的分區分配,在此之后需要將分配的方案同步給各個消費者,此時leader 消費者并不是直接和其余的普通消費者同步分配方案,而是通過GroupCoordinator 這個“中間人”來負責轉發同步分配方案的。

第四階段( HEARTBEAT)

消費者通過向GroupCoordinator 發送心跳來維持它們與消費組的從屬關系,以及它們對分區的所有權關系。只要消費者以正常的時間間隔發送心跳, 就被認為是活躍的,說明它還在讀取分區中的消息。

心跳線程是一個獨立的線程,可以在輪詢消息的空檔發送心跳。如果消費者停止發送心跳的時間足夠長,則整個會話就被判定為過期, GroupCoordinator 也會認為這個消費者己經死亡,就會觸發一次再均衡行為。

__consumer _offsets 剖析

位移提交的內容最終會保存到Kafka 的內部主題__consumer_offsets 中。可以通過kafka-consoIe-consumer. sh 腳本來查看__consumer_offsets 中的內容,不過要設定formatter 參數為kafka.coordinator.group.GroupMetadataManager$0ffsetsMessageForrnatter 。

bin/kafka-console-consumer.sh --boOtstrap-server localhost:9092 --topic _consumer_offsets --partition 20 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

事務

消息傳輸保障

一般而言,消息中間件的消息傳輸保障有3 個層級,分別如下。

(1) at most once :至多一次。消息可能會丟失,但絕對不會重復傳輸。

(2) at least once : 最少一次。消息絕不會丟失,但可能會重復傳輸。

(3) exactly once :恰好一次。每條消息肯定會被傳輸一次且僅傳輸一次。

Kafka 從0.11.0.0 版本開始引入了冪等和事務這兩個特性,以此來實現EOS ( exactly once semantics ,精確一次處理語義) 。

幕等

所謂的幕等,簡單地說就是對接口的多次調用所產生的結果和調用一次是一致的。

開啟幕等性功能的方式很簡單,只需要顯式地將生產者客戶端參數enable.idempotence設置為true 即可(這個參數的默認值為false ),

不過如果要確保軍等性功能正常,還需要確保生產者客戶端的retries 、acks 、max.in flight.requests.per.connection 這幾個參數不被配置錯。實際上在使用幕等性功能的時候,用戶完全可以不用配置(也不建議配置)這幾個參數。

為了實現生產者的幕等性, Kafka 為此引入了producer id ( 以下簡稱PID )和序列號( sequence number )這兩個概念,引入序列號來實現幕等也只是針對每一對<PID , 分區>而言的,也就是說, Kafka 的冪等只能保證單個生產者會話( session )中單分區的冪等

事務

冪等性并不能跨多個分區運作,而事務l可以彌補這個缺陷。事務可以保證對多個分區寫入操作的原子性。Kafka 中的事務可以使應用程序將消費消息、生產消息、提交消費位移當作原子操作來處理,同時成功或失敗,即使該生產或消費會跨多個分區。

為了實現事務,應用程序必須提供唯一的transactionalId ,這個transactionalId 通過客戶端參數transactional.id 來顯式設置,事務要求生產者開啟幕等特性,因此通過將transactional.id 參數設置為非空從而開啟事務特性的同時需要將enable.idempotence 設置為true ( 如果未顯式設置, 則KafkaProducer 默認會將它的值設置為true )

可靠性

副本

? 副本是相對于分區而言的,即副本是特定分區的副本

? 一個分區中包含一個或多個副本,其中一個為leader副本,其余為follower副本,各個副本位千不同的broker節點中。只有laeder副本對外提供服務,follower副本只負責數據同步。

? 分區中的所有副本統稱為AR, 而ISR是指與leadre 副本保持同步狀態的副本集合,當然leader副本本身也是這個集合中的一員。

? LEO標識每個分區中最后一條消息的下一個位置,分區的每個副本都有自己的LEO,ISR中最小的LEO即為HW, 俗稱高水位,消費者只能拉取到HW之前的消息。

失效副本

ISR的伸縮

LEO與HW

本地副本(Local Replica)和遠程副本( Remote Replica)本地副本是指對應的Log分配在當前的broker節點上,遠程副本是指對應的Log分配在其他的broker節點上。

Kafka監控

總結

以上是生活随笔為你收集整理的kafka技术内幕(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。