漫游Kafka设计篇之Producer和Consumer
生活随笔
收集整理的這篇文章主要介紹了
漫游Kafka设计篇之Producer和Consumer
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
原文地址:http://blog.csdn.net/honglei915/article/details/37564871
Kafka視頻教程同步首發(fā),歡迎觀看!
Kafka Producer
消息發(fā)送
Kafka Consumer
Kafa consumer消費(fèi)消息時,向broker發(fā)出"fetch"請求去消費(fèi)特定分區(qū)的消息。consumer指定消息在日志中的偏移量(offset),就可以消費(fèi)從這個位置開始的消息。customer擁有了offset的控制權(quán),可以向后回滾去重新消費(fèi)之前的消息,這是很有意義的。 ?
一些消息系統(tǒng)比如Scribe和Apache Flume采用了push模式,將消息推送到下游的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對于不同消費(fèi)速率的consumer就不太好處理了。消息系統(tǒng)都致力于讓consumer以最大的速率最快速的消費(fèi)消息,但不幸的是,push模式下,當(dāng)broker推送的速率遠(yuǎn)大于consumer消費(fèi)的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統(tǒng)的pull模式。
Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數(shù)據(jù)。Push模式必須在不知道下游consumer消費(fèi)能力和消費(fèi)策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費(fèi)。Pull模式下,consumer就可以根據(jù)自己的消費(fèi)能力去決定這些策略。
Pull有個缺點(diǎn)是,如果broker沒有可供消費(fèi)的消息,將導(dǎo)致consumer不斷在循環(huán)中輪詢,直到新消息到t達(dá)。為了避免這點(diǎn),Kafka有個參數(shù)可以讓consumer阻塞知道新消息到達(dá)(當(dāng)然也可以阻塞知道消息的數(shù)量達(dá)到某個特定的量這樣就可以批量發(fā)送)。
大部分消息系統(tǒng)在broker端的維護(hù)消息被消費(fèi)的記錄:一個消息被分發(fā)到consumer后broker就馬上進(jìn)行標(biāo)記或者等待customer的通知后進(jìn)行標(biāo)記。這樣也可以在消息在消費(fèi)后立馬就刪除以減少空間占用。
但是這樣會不會有什么問題呢?如果一條消息發(fā)送出去之后就立即被標(biāo)記為消費(fèi)過的,一旦consumer處理消息時失敗了(比如程序崩潰)消息就丟失了。為了解決這個問題,很多消息系統(tǒng)提供了另外一個個功能:當(dāng)消息被發(fā)送出去之后僅僅被標(biāo)記為已發(fā)送狀態(tài),當(dāng)接到consumer已經(jīng)消費(fèi)成功的通知后才標(biāo)記為已被消費(fèi)的狀態(tài)。這雖然解決了消息丟失的問題,但產(chǎn)生了新問題,首先如果consumer處理消息成功了但是向broker發(fā)送響應(yīng)時失敗了,這條消息將被消費(fèi)兩次。第二個問題時,broker必須維護(hù)每條消息的狀態(tài),并且每次都要先鎖住消息然后更改狀態(tài)然后釋放鎖。這樣麻煩又來了,且不說要維護(hù)大量的狀態(tài)數(shù)據(jù),比如如果消息發(fā)送出去但沒有收到消費(fèi)成功的通知,這條消息將一直處于被鎖定的狀態(tài),
Kafka采用了不同的策略。Topic被分成了若干分區(qū),每個分區(qū)在同一時間只被一個consumer消費(fèi)。這意味著每個分區(qū)被消費(fèi)的消息在日志中的位置僅僅是一個簡單的整數(shù):offset。這樣就很容易標(biāo)記每個分區(qū)消費(fèi)狀態(tài)就很容易了,僅僅需要一個整數(shù)而已。這樣消費(fèi)狀態(tài)的跟蹤就很簡單了。
這帶來了另外一個好處:consumer可以把offset調(diào)成一個較老的值,去重新消費(fèi)老的消息。這對傳統(tǒng)的消息系統(tǒng)來說看起來有些不可思議,但確實(shí)是非常有用的,誰規(guī)定了一條消息只能被消費(fèi)一次呢?consumer發(fā)現(xiàn)解析數(shù)據(jù)的程序有bug,在修改bug后再來解析一次消息,看起來是很合理的額呀!
Kafka視頻教程同步首發(fā),歡迎觀看!
Kafka Producer
消息發(fā)送
producer直接將數(shù)據(jù)發(fā)送到broker的leader(主節(jié)點(diǎn)),不需要在多個節(jié)點(diǎn)進(jìn)行分發(fā)。為了幫助producer做到這點(diǎn),所有的Kafka節(jié)點(diǎn)都可以及時的告知:哪些節(jié)點(diǎn)是活動的,目標(biāo)topic目標(biāo)分區(qū)的leader在哪。這樣producer就可以直接將消息發(fā)送到目的地了。
客戶端控制消息將被分發(fā)到哪個分區(qū)。可以通過負(fù)載均衡隨機(jī)的選擇,或者使用分區(qū)函數(shù)。Kafka允許用戶實(shí)現(xiàn)分區(qū)函數(shù),指定分區(qū)的key,將消息hash到不同的分區(qū)上(當(dāng)然有需要的話,也可以覆蓋這個分區(qū)函數(shù)自己實(shí)現(xiàn)邏輯).比如如果你指定的key是user id,那么同一個用戶發(fā)送的消息都被發(fā)送到同一個分區(qū)上。經(jīng)過分區(qū)之后,consumer就可以有目的的消費(fèi)某個分區(qū)的消息。異步發(fā)送
批量發(fā)送可以很有效的提高發(fā)送效率。Kafka producer的異步發(fā)送模式允許進(jìn)行批量發(fā)送,先將消息緩存在內(nèi)存中,然后一次請求批量發(fā)送出去。這個策略可以配置的,比如可以指定緩存的消息達(dá)到某個量的時候就發(fā)出去,或者緩存了固定的時間后就發(fā)送出去(比如100條消息就發(fā)送,或者每5秒發(fā)送一次)。這種策略將大大減少服務(wù)端的I/O次數(shù)。既然緩存是在producer端進(jìn)行的,那么當(dāng)producer崩潰時,這些消息就會丟失。Kafka0.8.1的異步發(fā)送模式還不支持回調(diào),就不能在發(fā)送出錯時進(jìn)行處理。Kafka 0.9可能會增加這樣的回調(diào)函數(shù)。見Proposed Producer API.
Kafka Consumer
Kafa consumer消費(fèi)消息時,向broker發(fā)出"fetch"請求去消費(fèi)特定分區(qū)的消息。consumer指定消息在日志中的偏移量(offset),就可以消費(fèi)從這個位置開始的消息。customer擁有了offset的控制權(quán),可以向后回滾去重新消費(fèi)之前的消息,這是很有意義的。 ?
推還是拉?
Kafka最初考慮的問題是,customer應(yīng)該從brokes拉取消息還是brokers將消息推送到consumer,也就是pull還push。在這方面,Kafka遵循了一種大部分消息系統(tǒng)共同的傳統(tǒng)的設(shè)計(jì):producer將消息推送到broker,consumer從broker拉取消息。一些消息系統(tǒng)比如Scribe和Apache Flume采用了push模式,將消息推送到下游的consumer。這樣做有好處也有壞處:由broker決定消息推送的速率,對于不同消費(fèi)速率的consumer就不太好處理了。消息系統(tǒng)都致力于讓consumer以最大的速率最快速的消費(fèi)消息,但不幸的是,push模式下,當(dāng)broker推送的速率遠(yuǎn)大于consumer消費(fèi)的速率時,consumer恐怕就要崩潰了。最終Kafka還是選取了傳統(tǒng)的pull模式。
Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數(shù)據(jù)。Push模式必須在不知道下游consumer消費(fèi)能力和消費(fèi)策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率,將可能導(dǎo)致一次只推送較少的消息而造成浪費(fèi)。Pull模式下,consumer就可以根據(jù)自己的消費(fèi)能力去決定這些策略。
Pull有個缺點(diǎn)是,如果broker沒有可供消費(fèi)的消息,將導(dǎo)致consumer不斷在循環(huán)中輪詢,直到新消息到t達(dá)。為了避免這點(diǎn),Kafka有個參數(shù)可以讓consumer阻塞知道新消息到達(dá)(當(dāng)然也可以阻塞知道消息的數(shù)量達(dá)到某個特定的量這樣就可以批量發(fā)送)。
消費(fèi)狀態(tài)跟蹤
對消費(fèi)消息狀態(tài)的記錄也是很重要的。大部分消息系統(tǒng)在broker端的維護(hù)消息被消費(fèi)的記錄:一個消息被分發(fā)到consumer后broker就馬上進(jìn)行標(biāo)記或者等待customer的通知后進(jìn)行標(biāo)記。這樣也可以在消息在消費(fèi)后立馬就刪除以減少空間占用。
但是這樣會不會有什么問題呢?如果一條消息發(fā)送出去之后就立即被標(biāo)記為消費(fèi)過的,一旦consumer處理消息時失敗了(比如程序崩潰)消息就丟失了。為了解決這個問題,很多消息系統(tǒng)提供了另外一個個功能:當(dāng)消息被發(fā)送出去之后僅僅被標(biāo)記為已發(fā)送狀態(tài),當(dāng)接到consumer已經(jīng)消費(fèi)成功的通知后才標(biāo)記為已被消費(fèi)的狀態(tài)。這雖然解決了消息丟失的問題,但產(chǎn)生了新問題,首先如果consumer處理消息成功了但是向broker發(fā)送響應(yīng)時失敗了,這條消息將被消費(fèi)兩次。第二個問題時,broker必須維護(hù)每條消息的狀態(tài),并且每次都要先鎖住消息然后更改狀態(tài)然后釋放鎖。這樣麻煩又來了,且不說要維護(hù)大量的狀態(tài)數(shù)據(jù),比如如果消息發(fā)送出去但沒有收到消費(fèi)成功的通知,這條消息將一直處于被鎖定的狀態(tài),
Kafka采用了不同的策略。Topic被分成了若干分區(qū),每個分區(qū)在同一時間只被一個consumer消費(fèi)。這意味著每個分區(qū)被消費(fèi)的消息在日志中的位置僅僅是一個簡單的整數(shù):offset。這樣就很容易標(biāo)記每個分區(qū)消費(fèi)狀態(tài)就很容易了,僅僅需要一個整數(shù)而已。這樣消費(fèi)狀態(tài)的跟蹤就很簡單了。
這帶來了另外一個好處:consumer可以把offset調(diào)成一個較老的值,去重新消費(fèi)老的消息。這對傳統(tǒng)的消息系統(tǒng)來說看起來有些不可思議,但確實(shí)是非常有用的,誰規(guī)定了一條消息只能被消費(fèi)一次呢?consumer發(fā)現(xiàn)解析數(shù)據(jù)的程序有bug,在修改bug后再來解析一次消息,看起來是很合理的額呀!
離線處理消息
高級的數(shù)據(jù)持久化允許consumer每個隔一段時間批量的將數(shù)據(jù)加載到線下系統(tǒng)中比如Hadoop或者數(shù)據(jù)倉庫。這種情況下,Hadoop可以將加載任務(wù)分拆,拆成每個broker或每個topic或每個分區(qū)一個加載任務(wù)。Hadoop具有任務(wù)管理功能,當(dāng)一個任務(wù)失敗了就可以重啟而不用擔(dān)心數(shù)據(jù)被重新加載,只要從上次加載的位置繼續(xù)加載消息就可以了。總結(jié)
以上是生活随笔為你收集整理的漫游Kafka设计篇之Producer和Consumer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 漫游Kafka设计篇之性能优化
- 下一篇: 漫游Kafka设计篇之主从同步