日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Apache kafka原理与特性(0.8V)

發(fā)布時(shí)間:2025/3/17 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache kafka原理与特性(0.8V) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言: kafka是一個(gè)輕量級(jí)的/分布式的/具備replication能力的日志采集組件,通常被集成到應(yīng)用系統(tǒng)中,收集"用戶(hù)行為日志"等,并可以使用各種消費(fèi)終端(consumer)將消息轉(zhuǎn)存到HDFS等其他結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)系統(tǒng)中.因?yàn)槿罩鞠⑼ǔ槲谋緮?shù)據(jù),尺寸較小,且對(duì)實(shí)時(shí)性以及數(shù)據(jù)可靠性要求不嚴(yán)格,但是需要日志存儲(chǔ)端具備較高的數(shù)據(jù)吞吐能力,這種"寬松"的設(shè)計(jì)要求,非常適合使用kafka.?

一.入門(mén)

? ??1.1?簡(jiǎn)介

? ? Kafka是一個(gè)"分布式的"/"可分區(qū)的(partitioned)"/"基于備份的(replicated)"/"基于commit-log存儲(chǔ)"的服務(wù). 它提供了類(lèi)似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn).

? ? kafka消息是根據(jù)Topic進(jìn)行歸類(lèi),發(fā)送消息者成為Producer,消息接收者成為Consumer;此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)稱(chēng)為broker.

? ? 無(wú)論是kafka集群,還是producer和consumer都依賴(lài)于zookeeper來(lái)保證系統(tǒng)可用性以及保存一些meta信息.

?

(摘自官網(wǎng))?

? ? 其中client與server的通訊,都是基于TCP,而且消息協(xié)議非常輕量級(jí).

? ??Topics/logs

? ? 一個(gè)Topic可以認(rèn)為是一類(lèi)消息,每個(gè)topic將被分成多個(gè)partition(區(qū)),每個(gè)partition在存儲(chǔ)層面是append log文件.任何發(fā)布到此partition的消息都會(huì)直接追加到log文件的尾部,每條消息在文件中的位置稱(chēng)為offset(偏移量),offset為一個(gè)long型數(shù)字,它唯一的標(biāo)記一條消息.kafka并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ)offset,因?yàn)樵趉afka中幾乎不允許對(duì)消息進(jìn)行"隨機(jī)讀-寫(xiě)",一旦消息寫(xiě)入log日志之后,將不能被修改.



(摘自官網(wǎng))?

? ? kafka和JMS實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會(huì)被立即刪除.日志文件將會(huì)根據(jù)broker中的配置要求,保留一定的時(shí)間之后刪除;比如log文件保留2天,那么兩天后,文件會(huì)被清除,無(wú)論其中的消息是否被消費(fèi).kafka通過(guò)這種簡(jiǎn)單的手段,來(lái)釋放磁盤(pán)空間.此外,kafka的性能并不會(huì)因?yàn)槿罩疚募奶喽拖?所以即使保留較多的log文件,也不不會(huì)有問(wèn)題.

? ? 對(duì)于consumer而言,它需要保存消費(fèi)消息的offset,對(duì)于offset的保存和使用,有consumer來(lái)控制;當(dāng)consumer正常消費(fèi)消息時(shí),offset將會(huì)"線(xiàn)性"的向前驅(qū)動(dòng),即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值..(offset將會(huì)保存在zookeeper中,參見(jiàn)下文)

? ? kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶(hù)端實(shí)現(xiàn)非常輕量級(jí),它們可以隨意離開(kāi),而不會(huì)對(duì)集群造成額外的影響.

? ? partitions的設(shè)計(jì)目的有多個(gè).最根本原因是kafka基于文件存儲(chǔ).通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤(pán)的上限,每個(gè)partiton都會(huì)被當(dāng)前server(kafka實(shí)例)保存;可以將一個(gè)topic切分多任意多個(gè)partitions(備注:基于sharding),來(lái)消息保存/消費(fèi)的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力.(具體原理參見(jiàn)下文).

? ??Distribution

? ? 一個(gè)Topic的多個(gè)partitions,被分布在kafka集群中的多個(gè)server上;每個(gè)server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫(xiě)操作;此外kafka還可以配置每個(gè)partition需要備份的個(gè)數(shù)(replicas),每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性.[replicas特性在0.8V才支持]

? ? 基于replicated方案,那么就意味著需要對(duì)多個(gè)備份進(jìn)行調(diào)度;一個(gè)partition可以在多個(gè)server上備份,那么其中一個(gè)server作為此partiton的leader;leader負(fù)責(zé)此partition所有的讀寫(xiě)操作,如果leader失效,那么將會(huì)有其他follower來(lái)接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見(jiàn)作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)"leader",kafka會(huì)將"leader"均衡的分散在每個(gè)實(shí)例上,來(lái)確保整體的性能穩(wěn)定.[備注:kafka中將leader角色權(quán)限下放到partition這個(gè)層級(jí)]

?

kafka-cluster?

? ??Producers

? ? Producer將消息發(fā)布到指定的Topic中,同時(shí)Producer也能決定將此消息發(fā)送到哪個(gè)partition;如果一個(gè)Topic有多個(gè)partitions時(shí),你需要選擇partition是算法,比如基于"round-robin"方式或者通過(guò)其他的一些算法等.無(wú)論如何選擇partition路由算法,我們最直接的目的就是希望消息能夠均勻的發(fā)送給每個(gè)partition,這樣可以讓consumer消費(fèi)的消息量也能"均衡".

? ??Consumers

? ? 本質(zhì)上kafka只支持Topic.每個(gè)consumer屬于一個(gè)consumer group;反過(guò)來(lái)說(shuō),每個(gè)group中可以有多個(gè)consumer.對(duì)于Topic中的一條特定的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi),此消息不會(huì)發(fā)送給一個(gè)group的多個(gè)consumer;那么一個(gè)group中所有的consumer將會(huì)交錯(cuò)的消費(fèi)整個(gè)Topic.

? ? 如果所有的consumer都具有相同的group,這種情況和JMS queue模式很像;消息將會(huì)在consumers之間負(fù)載均衡.

? ? 如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱";消息將會(huì)廣播給所有的消費(fèi)者.



(摘自官網(wǎng))?

? ? 在kafka中,一個(gè)partition中的消息只會(huì)被group中的一個(gè)consumer消費(fèi)(同一時(shí)刻);每個(gè)group中consumer消息消費(fèi)互相獨(dú)立;我們可以認(rèn)為一個(gè)group是一個(gè)"訂閱"者,一個(gè)Topic中的每個(gè)partions,只會(huì)被一個(gè)"訂閱者"中的一個(gè)consumer消費(fèi),不過(guò)一個(gè)consumer可以同時(shí)消費(fèi)多個(gè)partitions中的消息.kafka只能保證一個(gè)partition中的消息被某個(gè)consumer消費(fèi)時(shí)是順序的.事實(shí)上,從Topic角度來(lái)說(shuō),當(dāng)有多個(gè)partitions時(shí),消息仍不是全局有序的.

? ? 通常情況下,一個(gè)group中會(huì)包含多個(gè)consumer,這樣不僅可以提高topic中消息的并發(fā)消費(fèi)能力,而且還能提高"故障容錯(cuò)"性,如果group中的某個(gè)consumer失效,那么其消費(fèi)的partitions將會(huì)有其他consumer自動(dòng)接管.

? ? kafka的設(shè)計(jì)原理決定,對(duì)于一個(gè)topic,同一個(gè)group中不能有多于partitions個(gè)數(shù)的consumer同時(shí)消費(fèi),否則將意味著某些consumer將無(wú)法得到消息.

? ??Guarantees

? ? 1) 發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中,無(wú)論一個(gè)partition由多少個(gè)log文件構(gòu)成,那么它發(fā)送給consumer的順序是一定的.

? ? 2) 對(duì)于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致.

? ? 3) 如果Topic的"replication factor"為N,那么允許N-1個(gè)kafka實(shí)例失效.只要有一個(gè)replication存活,那么此partition的讀寫(xiě)操作都不會(huì)中斷.

?

? ? 1.2 Use cases

? ??Messaging

? ? 和一些常規(guī)的消息系統(tǒng)相比,kafka仍然是個(gè)不錯(cuò)的選擇;它具備partitons/replication和容錯(cuò),可以使kafka具有良好的擴(kuò)展性和性能優(yōu)勢(shì).不過(guò)到目前為止,我們應(yīng)該很清楚認(rèn)識(shí)到,kafka并沒(méi)有提供JMS中的"事務(wù)性""消息傳輸擔(dān)保(消息確認(rèn)機(jī)制)""消息分組"等企業(yè)級(jí)特性;kafka只能使用作為"常規(guī)"的消息系統(tǒng),在一定程度上,尚未確保消息的發(fā)送與接收絕對(duì)可靠(比如,消息重發(fā),消息發(fā)送丟失等)

? ??Websit activity tracking

? ? kafka可以作為"網(wǎng)站活性跟蹤"的最佳工具;可以將網(wǎng)頁(yè)/用戶(hù)操作等信息發(fā)送到kafka中.并實(shí)時(shí)監(jiān)控,或者離線(xiàn)統(tǒng)計(jì)分析等.

? ??Log Aggregation

? ? kafka的特性決定它非常適合作為"日志收集中心";application可以將操作日志"批量""異步"的發(fā)送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/壓縮消息等,這對(duì)producer端而言,幾乎感覺(jué)不到性能的開(kāi)支.此時(shí)consumer端可以使hadoop等其他系統(tǒng)化的存儲(chǔ)和分析系統(tǒng).

?

二. 設(shè)計(jì)原理

? ? kafka的設(shè)計(jì)初衷是希望做為一個(gè)統(tǒng)一的信息收集平臺(tái),能夠?qū)崟r(shí)的收集反饋信息,并需要能夠支撐較大的數(shù)據(jù)量,且具備良好的容錯(cuò)能力.

? ??1.Persistence

? ? kafka使用文件存儲(chǔ)消息(append only log),這就直接決定kafka在性能上嚴(yán)重依賴(lài)文件系統(tǒng)的本身特性.且無(wú)論任何OS下,對(duì)文件系統(tǒng)本身的優(yōu)化是非常艱難的.文件緩存/直接內(nèi)存映射等是常用的手段.因?yàn)閗afka是對(duì)日志文件進(jìn)行append操作,因此磁盤(pán)檢索的開(kāi)支是較小的;同時(shí)為了減少磁盤(pán)寫(xiě)入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣減少了磁盤(pán)IO調(diào)用的次數(shù).對(duì)于kafka而言,較高性能的磁盤(pán),將會(huì)帶來(lái)更加直接的性能提升.

? ??2.Efficiency

? ? 需要考慮的影響性能點(diǎn)很多,除磁盤(pán)IO之外,我們還需要考慮網(wǎng)絡(luò)IO,這直接關(guān)系到kafka的吞吐量問(wèn)題.kafka并沒(méi)有提供太多高超的技巧;對(duì)于producer端,可以將消息buffer起來(lái),當(dāng)消息的條數(shù)達(dá)到一定閥值時(shí),批量發(fā)送給broker;對(duì)于consumer端也是一樣,批量fetch多條消息.不過(guò)消息量的大小可以通過(guò)配置文件來(lái)指定.對(duì)于kafka broker端,似乎有個(gè)sendfile系統(tǒng)調(diào)用可以潛在的提升網(wǎng)絡(luò)IO的性能:將文件的數(shù)據(jù)映射到系統(tǒng)內(nèi)存中,socket直接讀取相應(yīng)的內(nèi)存區(qū)域即可,而無(wú)需進(jìn)程再次copy和交換(這里涉及到"磁盤(pán)IO數(shù)據(jù)"/"內(nèi)核內(nèi)存"/"進(jìn)程內(nèi)存"/"網(wǎng)絡(luò)緩沖區(qū)",多者之間的數(shù)據(jù)copy).

?

? ? 其實(shí)對(duì)于producer/consumer/broker三者而言,CPU的開(kāi)支應(yīng)該都不大,因此啟用消息壓縮機(jī)制是一個(gè)良好的策略;壓縮需要消耗少量的CPU資源,不過(guò)對(duì)于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮.可以將任何在網(wǎng)絡(luò)上傳輸?shù)南⒍冀?jīng)過(guò)壓縮.kafka支持gzip/snappy等多種壓縮方式.

? ??3. Producer

? ??Load balancing

? ? kafka集群中的任何一個(gè)broker,都可以向producer提供metadata信息,這些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(請(qǐng)參看zookeeper中的節(jié)點(diǎn)信息). 當(dāng)producer獲取到metadata信心之后,?producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過(guò)socket發(fā)送到broker,中間不會(huì)經(jīng)過(guò)任何"路由層".事實(shí)上,消息被路由到哪個(gè)partition上,有producer客戶(hù)端決定.比如可以采用"random""key-hash""輪詢(xún)"等,如果一個(gè)topic中有多個(gè)partitions,那么在producer端實(shí)現(xiàn)"消息均衡分發(fā)"是必要的.在producer端的配置文件中,開(kāi)發(fā)者可以指定partition路由的方式.

?

? ??Asynchronous send

? ? 將多條消息暫且在客戶(hù)端buffer起來(lái),并將他們批量發(fā)送到broker;小數(shù)據(jù)IO太多,會(huì)拖慢整體的網(wǎng)絡(luò)延遲,批量延遲發(fā)送事實(shí)上提升了網(wǎng)絡(luò)效率;不過(guò)這也有一定的隱患,比如當(dāng)producer失效時(shí),那些尚未發(fā)送的消息將會(huì)丟失.

? ??4.Consumer

? ? consumer端向broker發(fā)送"fetch"請(qǐng)求,并告知其獲取消息的offset;此后consumer將會(huì)獲得一定條數(shù)的消息;consumer端也可以重置offset來(lái)重新消費(fèi)消息.[備注:offset,消息偏移量,integer值,broker可以根據(jù)offset來(lái)決定消息的起始位置]

? ? 在JMS實(shí)現(xiàn)中,Topic模型基于push方式,即broker將消息推送給consumer端.不過(guò)在kafka中,采用了pull方式,即consumer在和broker建立連接之后,主動(dòng)去pull(或者說(shuō)fetch)消息;這中模式有些優(yōu)點(diǎn),首先consumer端可以根據(jù)自己的消費(fèi)能力適時(shí)的去fetch消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset);此外,消費(fèi)者可以良好的控制消息消費(fèi)的數(shù)量,batch fetch.

? ? 其他JMS實(shí)現(xiàn),消息消費(fèi)的位置是有prodiver保留,以便避免重復(fù)發(fā)送消息或者將沒(méi)有消費(fèi)成功的消息重發(fā)等,同時(shí)還要控制消息的狀態(tài).這就要求JMS broker需要太多額外的工作.在kafka中,partition中的消息只有一個(gè)consumer在消費(fèi),且不存在消息狀態(tài)的控制,也沒(méi)有復(fù)雜的消息確認(rèn)機(jī)制,可見(jiàn)kafka broker端是相當(dāng)輕量級(jí)的.當(dāng)消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并間歇性的向zookeeper注冊(cè)offset.由此可見(jiàn),consumer客戶(hù)端也很輕量級(jí).

? ? 這就意味著,kafka中consumer負(fù)責(zé)維護(hù)消息的消費(fèi)記錄,而broker則不關(guān)心這些,這種設(shè)計(jì)不僅提高了consumer端的靈活性,也適度的減輕了broker端設(shè)計(jì)的復(fù)雜度;這是和眾多JMS prodiver的區(qū)別.此外,kafka中消息ACK的設(shè)計(jì)也和JMS有很大不同,kafka中的消息時(shí)批量(通常以消息的條數(shù)或者chunk的尺寸為單位)發(fā)送給consumer,當(dāng)消息消費(fèi)成功后,向zookeeper提交消息的offset,而不會(huì)向broker交付ACK.或許你已經(jīng)意識(shí)到,這種"寬松"的設(shè)計(jì),將會(huì)有"丟失"消息/"消息重發(fā)"的危險(xiǎn).

?

? ??5.Message Delivery Semantics

? ? 對(duì)于JMS實(shí)現(xiàn),消息傳輸擔(dān)保非常直接:有且只有一次(exactly once).在kafka中稍有不同,對(duì)于consumer而言:

? ? 1) at most once: 最多一次,這個(gè)和JMS中"非持久化"消息類(lèi)似.發(fā)送一次,無(wú)論成敗,將不會(huì)重發(fā).

? ? 2) at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)重發(fā),直到接收成功.

? ? 3) exactly once: 消息只會(huì)發(fā)送一次.

? ? at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中consumer進(jìn)程失效(crash),導(dǎo)致部分消息未能繼續(xù)處理.那么此后可能其他consumer會(huì)接管,但是因?yàn)閛ffset已經(jīng)提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒(méi)有被處理),這就是"at most once".

? ? at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常或者consumer失效,導(dǎo)致保存offset操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是"at least once".

? ? exactly once: kafka中并沒(méi)有嚴(yán)格的去實(shí)現(xiàn)(基于2階段提交,事務(wù)),我們認(rèn)為這種策略在kafka中是沒(méi)有必要的.

? ? 因?yàn)?#34;消息消費(fèi)"和"保存offset"這兩個(gè)操作的先后時(shí)機(jī)不同,導(dǎo)致了上述3種情況,通常情況下"at-least-once"是我們搜選.(相比at most once而言,重復(fù)接收數(shù)據(jù)總比丟失數(shù)據(jù)要好).



?

? ??6. Replication

? ? kafka中,replication策略是基于partition,而不是topic;kafka將每個(gè)partition數(shù)據(jù)復(fù)制到多個(gè)server上,任何一個(gè)partition有一個(gè)leader和多個(gè)follower(可以沒(méi)有);備份的個(gè)數(shù)可以通過(guò)broker配置文件來(lái)設(shè)定.leader處理所有的read-write請(qǐng)求,follower需要和leader保持同步.Follower就像一個(gè)"consumer",消費(fèi)消息并保存在本地日志中;leader負(fù)責(zé)跟蹤所有的follower狀態(tài),如果follower"落后"太多或者失效,leader將會(huì)把它從replicas同步列表中刪除.當(dāng)所有的follower都將一條消息保存成功,此消息才被認(rèn)為是"committed",那么此時(shí)consumer才能消費(fèi)它,這種同步策略,就要求follower和leader之間必須具有良好的網(wǎng)絡(luò)環(huán)境.即使只有一個(gè)replicas實(shí)例存活,仍然可以保證消息的正常發(fā)送和接收,只要zookeeper集群存活即可.(備注:不同于其他分布式存儲(chǔ),比如hbase需要"多數(shù)派"存活才行)

? ? kafka判定一個(gè)follower存活與否的條件有2個(gè):1) follower需要和zookeeper保持良好的鏈接 2) 它必須能夠及時(shí)的跟進(jìn)leader,不能落后太多.如果同時(shí)滿(mǎn)足上述2個(gè)條件,那么leader就認(rèn)為此follower是"活躍的".如果一個(gè)follower失效(server失效)或者落后太多,leader將會(huì)把它從同步列表中移除[備注:如果此replicas落后太多,它將會(huì)繼續(xù)從leader中fetch數(shù)據(jù),直到足夠up-to-date,然后再次加入到同步列表中;kafka不會(huì)更換replicas宿主!因?yàn)?#34;同步列表"中replicas需要足夠快,這樣才能保證producer發(fā)布消息時(shí)接受到ACK的延遲較小].

? ? 當(dāng)leader失效時(shí),需在followers中選取出新的leader,可能此時(shí)follower落后于leader,因此需要選擇一個(gè)"up-to-date"的follower.kafka中l(wèi)eader選舉并沒(méi)有采用"投票多數(shù)派"的算法,因?yàn)檫@種算法對(duì)于"網(wǎng)絡(luò)穩(wěn)定性"/"投票參與者數(shù)量"等條件有較高的要求,而且kafka集群的設(shè)計(jì),還需要容忍N(yùn)-1個(gè)replicas失效.對(duì)于kafka而言,每個(gè)partition中所有的replicas信息都可以在zookeeper中獲得,那么選舉leader將是一件非常簡(jiǎn)單的事情.選擇follower時(shí)需要兼顧一個(gè)問(wèn)題,就是新leader server上所已經(jīng)承載的partition leader的個(gè)數(shù),如果一個(gè)server上有過(guò)多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負(fù)載均衡",partition leader較少的broker將會(huì)更有可能成為新的leader.

? ? 在整幾個(gè)集群中,只要有一個(gè)replicas存活,那么此partition都可以繼續(xù)接受讀寫(xiě)操作.

? ??7.Log

? ? 如果一個(gè)topic的名稱(chēng)為"my_topic",它有2個(gè)partitions,那么日志將會(huì)保存在my_topic_0和my_topic_1兩個(gè)目錄中;日志文件中保存了一序列"log entries"(日志條目),每個(gè)log entry格式為"4個(gè)字節(jié)的數(shù)字N表示消息的長(zhǎng)度" + "N個(gè)字節(jié)的消息內(nèi)容";每個(gè)日志都有一個(gè)offset來(lái)唯一的標(biāo)記一條消息,offset的值為8個(gè)字節(jié)的數(shù)字,表示此消息在此partition中所處的起始位置..每個(gè)partition在物理存儲(chǔ)層面,有多個(gè)log file組成(稱(chēng)為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.



(摘自官網(wǎng))?

? ? 其中每個(gè)partiton中所持有的segments列表信息會(huì)存儲(chǔ)在zookeeper中.

? ? 當(dāng)segment文件尺寸達(dá)到一定閥值時(shí)(可以通過(guò)配置文件設(shè)定,默認(rèn)1G),將會(huì)創(chuàng)建一個(gè)新的文件;當(dāng)buffer中消息的條數(shù)達(dá)到閥值時(shí)將會(huì)觸發(fā)日志信息flush到日志文件中,同時(shí)如果"距離最近一次flush的時(shí)間差"達(dá)到閥值時(shí),也會(huì)觸發(fā)flush到日志文件.如果broker失效,極有可能會(huì)丟失那些尚未flush到文件的消息.因?yàn)閟erver意外失效,仍然會(huì)導(dǎo)致log文件格式的破壞(文件尾部),那么就要求當(dāng)server啟東是需要檢測(cè)最后一個(gè)segment的文件結(jié)構(gòu)是否合法并進(jìn)行必要的修復(fù).

? ? 獲取消息時(shí),需要指定offset和最大chunk尺寸,offset用來(lái)表示消息的起始位置,chunk size用來(lái)表示最大獲取消息的總長(zhǎng)度(間接的表示消息的條數(shù)).根據(jù)offset,可以找到此消息所在segment文件,然后根據(jù)segment的最小offset取差值,得到它在file中的相對(duì)位置,直接讀取輸出即可.

? ? 日志文件的刪除策略非常簡(jiǎn)單:啟動(dòng)一個(gè)后臺(tái)線(xiàn)程定期掃描log file列表,把保存時(shí)間超過(guò)閥值的文件直接刪除(根據(jù)文件的創(chuàng)建時(shí)間).為了避免刪除文件時(shí)仍然有read操作(consumer消費(fèi)),采取copy-on-write方式.

? ??8.Distribution

? ? kafka使用zookeeper來(lái)存儲(chǔ)一些meta信息,并使用了zookeeper watch機(jī)制來(lái)發(fā)現(xiàn)meta信息的變更并作出相應(yīng)的動(dòng)作(比如consumer失效,觸發(fā)負(fù)載均衡等)

? ??1) Broker node registry:?當(dāng)一個(gè)kafka broker啟動(dòng)后,首先會(huì)向zookeeper注冊(cè)自己的節(jié)點(diǎn)信息(臨時(shí)znode),同時(shí)當(dāng)broker和zookeeper斷開(kāi)連接時(shí),此znode也會(huì)被刪除.

? ? 格式: /broker/ids/[0...N] ? -->host:port;其中[0..N]表示broker id,每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類(lèi)型的id(全局不可重復(fù)),znode的值為此broker的host:port信息.

? ??2) Broker Topic Registry:?當(dāng)一個(gè)broker啟動(dòng)時(shí),會(huì)向zookeeper注冊(cè)自己持有的topic和partitions信息,仍然是一個(gè)臨時(shí)znode.

? ? 格式: /broker/topics/[topic]/[0...N] ?其中[0..N]表示partition索引號(hào).

? ??3) Consumer and Consumer group:?每個(gè)consumer客戶(hù)端被創(chuàng)建時(shí),會(huì)向zookeeper注冊(cè)自己的信息;此作用主要是為了"負(fù)載均衡".

? ? 一個(gè)group中的多個(gè)consumer可以交錯(cuò)的消費(fèi)一個(gè)topic的所有partitions;簡(jiǎn)而言之,保證此topic的所有partitions都能被此group所消費(fèi),且消費(fèi)時(shí)為了性能考慮,讓partition相對(duì)均衡的分散到每個(gè)consumer上.

? ??4) Consumer id Registry:?每個(gè)consumer都有一個(gè)唯一的ID(host:uuid,可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息.

? ? 格式: /consumers/[group_id]/ids/[consumer_id]

? ? 仍然是一個(gè)臨時(shí)的znode,此節(jié)點(diǎn)的值為{"topic_name":#streams...},即表示此consumer目前所消費(fèi)的topic + partitions列表.

? ??5) Consumer offset Tracking:?用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset.

? ? 格式: /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] ? -->offset_value

? ? 此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)group中一個(gè)消費(fèi)者失效,其他consumer可以繼續(xù)消費(fèi).

? ??6) Partition Owner registry:?用來(lái)標(biāo)記partition正在被哪個(gè)consumer消費(fèi).臨時(shí)znode

? ? 格式:?/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] ? -->consumer_node_id

? ? 此節(jié)點(diǎn)表達(dá)了"一個(gè)partition"只能被group下一個(gè)consumer消費(fèi),同時(shí)當(dāng)group下某個(gè)consumer失效,那么將會(huì)觸發(fā)負(fù)載均衡(即:讓partitions在多個(gè)consumer間均衡消費(fèi),接管那些"游離"的partitions)

?

? ? 當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:

? ? A) 首先進(jìn)行"Consumer id Registry";

? ? B) 然后在"Consumer id Registry"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"leave"和"join";只要此znode path下節(jié)點(diǎn)列表變更,都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).

? ? C) 在"Broker id registry"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

?

? ??Consumer均衡算法

? ? 當(dāng)一個(gè)group中,有consumer加入或者離開(kāi)時(shí),會(huì)觸發(fā)partitions均衡.均衡的最終目的,是提升topic的并發(fā)消費(fèi)能力.

? ? ?1) 假如topic1,具有如下partitions: P0,P1,P2,P3

? ? ?2) 加入group中,有如下consumer: C0,C1

? ? ?3) 首先根據(jù)partition索引號(hào)對(duì)partitions排序: P0,P1,P2,P3

? ? ?4) 根據(jù)consumer.id排序: C0,C1

? ? ?5) 計(jì)算倍數(shù): M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

? ? ?6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]



? ??總結(jié):?

? ? 1) Producer端使用zookeeper用來(lái)"發(fā)現(xiàn)"broker列表,以及和Topic下每個(gè)partition leader建立socket連接并發(fā)送消息.

? ? 2) Broker端使用zookeeper用來(lái)注冊(cè)broker信息,已經(jīng)監(jiān)測(cè)partition leader存活性.

? ? 3) Consumer端使用zookeeper用來(lái)注冊(cè)consumer信息,其中包括consumer消費(fèi)的partition列表等,同時(shí)也用來(lái)發(fā)現(xiàn)broker列表,并和partition leader建立socket連接,并獲取消息.

?

三.主要配置?

? ? 1.Broker主要配置

Xml代碼?? ##broker標(biāo)識(shí),cluster中,此ID必須唯一??
  • ##接受consumer/producer的鏈接端口??
  • ##用來(lái)維護(hù)集群狀態(tài),以及consumer消費(fèi)記錄??
  • zookeeper.connect=localhost:2181??
  • ##broker所能接受的消息的最大尺寸??
  • messages.max.bytes=1000000??
  • num.network.threads=3??
  • num.io.threads=8??
  • ##進(jìn)行磁盤(pán)操作獲取數(shù)據(jù),數(shù)據(jù)操作結(jié)束后,請(qǐng)求被移除隊(duì)列并由network??
  • queued.max.requests=500??
  • socket.send.buffer.bytes=1048576??
  • socket.receive.buffer.bytes=1048576??
  • socket.request.max.bytes=104857600??
  • log.dirs=/tmp/kafka-logs??
  • ##kafka的特點(diǎn)就在于"分區(qū)",每個(gè)Topic被拆分成多個(gè)partitions??
  • num.partitions=2??
  • ##log文件中(append?only),此參數(shù)用于控制單個(gè)文件的大小.??
  • ##log.segment.bytes=??
  • ##log文件"sync"到磁盤(pán)之前累積的消息條數(shù)??
  • ##所以此參數(shù)的設(shè)置,需要在"數(shù)據(jù)可靠性"與"性能"之間做必要的權(quán)衡.??
  • ##如果此值過(guò)小,將會(huì)導(dǎo)致"fsync"的次數(shù)較多,這也意味著整體的client請(qǐng)求有一定的延遲.??
  • ##默認(rèn)值為10000??
  • ##僅僅通過(guò)interval來(lái)控制消息的磁盤(pán)寫(xiě)入時(shí)機(jī),是不足的.??
  • ##達(dá)到閥值,也將觸發(fā).??
  • #對(duì)某些特定的topic而言,重寫(xiě)log.flush.interval.messages屬性??
  • ??
  • ##是否自動(dòng)創(chuàng)建topic??
  • ##如果為false,則只能通過(guò)API或者command創(chuàng)建topic??
  • ##partition?leader與replicas之間通訊時(shí),socket的超時(shí)時(shí)間??
  • ##partition?leader與replicas數(shù)據(jù)同步時(shí),消息的隊(duì)列尺寸.??
  • ##partitions的"replicas"個(gè)數(shù),不得大于集群中broker的個(gè)數(shù)??
  • ##partition?Leader和follower通訊時(shí),如果在此時(shí)間內(nèi),沒(méi)有收到follower的"fetch請(qǐng)求"??
  • replica.lag.time.max.ms=10000??
  • ##通常,在follower與leader通訊時(shí),因?yàn)榫W(wǎng)絡(luò)延遲或者鏈接斷開(kāi),總會(huì)導(dǎo)致replicas中消息同步滯后??
  • ##到其他follower中.??
  • replica.lag.max.messages=4000??
  • replica.socket.timeout.ms=30000??
  • ##沒(méi)有意義的參數(shù)??
  • ##當(dāng)follower的fetch請(qǐng)求發(fā)出后,等待leader發(fā)送數(shù)據(jù)的時(shí)間.??
  • replica.fetch.wait.max.ms=500??
  • replica.fetch.min.bytes=1??
  • num.replica.fetchers=1??
  • ##檢測(cè)log文件的時(shí)間間隔??
  • ##log文件被保留的時(shí)長(zhǎng),如果超過(guò)此時(shí)長(zhǎng),將會(huì)被清除,無(wú)論log中的消息是否被消費(fèi)過(guò).??
  • log.retention.hours=168??

?

? ??2.Consumer主要配置

?

Java代碼?? ##當(dāng)前消費(fèi)者的group名稱(chēng),需要指定??
  • group.id=??
  • ##比如consumer消費(fèi)的消息offset等.??
  • zookeeper.connect=hostname1:port,hostname2:port2??
  • zookeeper.connection.timeout.ms=6000??
  • ##當(dāng)前consumer的標(biāo)識(shí),可以設(shè)定,也可以有系統(tǒng)生成.??
  • conusmer.id=??
  • ##每次feth將得到多條消息,此值為總大小??
  • fetch.messages.max.bytes=1048576??
  • fetch.min.bytes=1??
  • fetch.wait.max.ms=100??
  • ##當(dāng)有新的consumer加入到group時(shí),將會(huì)reblance,此后將會(huì)有partitions的消費(fèi)端遷移到新??
  • ##"Partition?Owner?registry"節(jié)點(diǎn)信息,但是有可能此時(shí)舊的consumer尚沒(méi)有釋放此節(jié)點(diǎn),??
  • rebalance.max.retries=4??
  • ##注意offset信息并不是每消費(fèi)一次消息就向zk提交一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交??
  • true??
  • auto.commit.interval.ms=60*1000??

?

? ?3.Producer主要配置

?

Java代碼?? ##對(duì)于開(kāi)發(fā)者而言,需要通過(guò)broker.list指定當(dāng)前producer需要關(guān)注的broker列表??
  • ##如果某個(gè)broker鏈接失敗,將導(dǎo)致此上的partitons無(wú)法繼續(xù)發(fā)布消息??
  • ##對(duì)于producer而言沒(méi)有使用zookeeper自動(dòng)發(fā)現(xiàn)broker列表,非常奇怪。(0.8V和0.7有區(qū)別)??
  • ##producer接收消息ack的時(shí)機(jī).默認(rèn)為0.??
  • ##1:?當(dāng)leader接收到消息之后發(fā)送ack??
  • request.required.acks=0??
  • ##如果超時(shí),broker將會(huì)向producer發(fā)送一個(gè)error?ACK.意味著上一次消息因?yàn)槟撤N??
  • request.timeout.ms=10000??
  • ##異步意味著消息將會(huì)在本地buffer,并適時(shí)批量發(fā)送??
  • producer.type=sync??
  • byte[]??
  • class=kafka.serializer.DefaultEncoder??
  • class=${serializer.class}??
  • ##默認(rèn)為消息的hashcode?%?partitions個(gè)數(shù)??
  • class=kafka.producer.DefaultPartitioner??
  • ##消息壓縮算法,none,gzip,snappy??
  • ##消息在producer端buffer的條數(shù).僅在producer.type=async下有效??
  • ##在async模式下,當(dāng)message被緩存的時(shí)間超過(guò)此值后,將會(huì)批量發(fā)送給broker??
  • queue.buffering.max.ms=5000??
  • ##無(wú)論如何,producer都無(wú)法盡快的將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積??
  • queue.buffering.max.messages=10000??
  • ##阻塞一定時(shí)間后,隊(duì)列仍然沒(méi)有enqueue(producer仍然沒(méi)有發(fā)送出任何消息)??
  • ##-1:?無(wú)阻塞超時(shí)限制,消息不會(huì)被拋棄??
  • queue.enqueue.timeout.ms=-1??
  • ##因?yàn)閎roker并沒(méi)有完整的機(jī)制來(lái)避免消息重復(fù),所以當(dāng)網(wǎng)絡(luò)異常時(shí)(比如ACK丟失)??
  • message.send.max.retries=3??
  • ##producer需要知道partition?leader的位置,以及當(dāng)前topic的情況??
  • ##(比如topic失效,partition丟失,leader失效等),此外也可以通過(guò)此參數(shù)來(lái)配置額外的刷新機(jī)制??
  • topic.metadata.refresh.interval.ms=600000??

?

?

? ? broker配置文件請(qǐng)參考: kafka.server.KafkaConfig

? ? consumer配置文件請(qǐng)參考:?kafka.consumer.ConsumerConfig

? ? producer配置文件請(qǐng)參考:?kafka.producer.ProducerConfig

總結(jié)

以上是生活随笔為你收集整理的Apache kafka原理与特性(0.8V)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。