日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

超详细 kafka 入门(最佳实践)

發(fā)布時間:2025/3/20 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 超详细 kafka 入门(最佳实践) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
今日推薦6年,終拿騰訊 offer! 都2021年了,你還在用Jenkins?趕快看看這些替代方案吧! 盤點(diǎn) 12 個 GitHub 上的高仿項(xiàng)目

CTO 說了,用錯 @Autowired 和 @Resource 的人可以領(lǐng)盒飯了

用鴻蒙跑了個 hello world

認(rèn)識 kafka

kafka簡介

Kafka 是一個分布式流媒體平臺,kafka官網(wǎng):http://kafka.apache.org/

1)流媒體平臺有三個關(guān)鍵功能:

  • 發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。

  • 以容錯的持久方式存儲記錄流。

  • 記錄發(fā)生時處理流。

2)Kafka通常用于兩大類應(yīng)用:

  • 構(gòu)建可在系統(tǒng)或應(yīng)用程序之間可靠獲取數(shù)據(jù)的實(shí)時流數(shù)據(jù)管道

  • 構(gòu)建轉(zhuǎn)換或響應(yīng)數(shù)據(jù)流的實(shí)時流應(yīng)用程序

3)首先是幾個概念:

  • Kafka作為一個集群運(yùn)行在一個或多個可跨多個數(shù)據(jù)中心的服務(wù)器上。

  • Kafka集群以稱為** topics主題**的類別存儲記錄流。

  • 每條記錄都包含一個鍵,一個值和一個時間戳

4)Kafka有四個核心API:

  • Producer API(生產(chǎn)者API)允許應(yīng)用程序發(fā)布記錄流至一個或多個kafka的topics(主題)。

  • Consumer API(消費(fèi)者API)允許應(yīng)用程序訂閱一個或多個topics(主題),并處理所產(chǎn)生的對他們記錄的數(shù)據(jù)流。

  • **Streams API(流API)**允許應(yīng)用程序充當(dāng)流處理器,從一個或多個topics(主題)消耗的輸入流,并產(chǎn)生一個輸出流至一個或多個輸出的topics(主題),有效地變換所述輸入流,以輸出流。

  • Connector API(連接器API)允許構(gòu)建和運(yùn)行kafka topics(主題)連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)中重用生產(chǎn)者或消費(fèi)者。例如,關(guān)系數(shù)據(jù)庫的連接器可能捕獲對表的每個更改。

在Kafka中,客戶端和服務(wù)器之間的通信是通過簡單,高性能,語言無關(guān)的TCP協(xié)議完成的。此協(xié)議已版本化并保持與舊版本的向后兼容性。Kafka提供Java客戶端,但客戶端有多種語言版本。

1.2 Topics主題 和 partitions分區(qū)

我們首先深入了解 Kafka 為記錄流提供的核心抽象 - 主題topics

一個Topic可以認(rèn)為是一類消息,每個topic將被分成多個partition(區(qū)),每個partition在存儲層面是append log文件

主題是發(fā)布記錄的類別或訂閱源名稱。Kafka的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費(fèi)者訂閱寫入它的數(shù)據(jù)。

對于每個主題,Kafka集群都維護(hù)一個如下所示的分區(qū)日志:

每個分區(qū)都是一個有序的,不可變的記錄序列,不斷附加到結(jié)構(gòu)化的提交日志中。分區(qū)中的記錄每個都分配了一個稱為偏移的順序ID號,它唯一地標(biāo)識分區(qū)中的每個記錄。

Kafka集群持久保存所有已發(fā)布的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留策略設(shè)置為兩天,則在發(fā)布記錄后的兩天內(nèi),它可供使用,之后將被丟棄以釋放空間。Kafka的性能在數(shù)據(jù)大小方面實(shí)際上是恒定的,因此長時間存儲數(shù)據(jù)不是問題。

實(shí)際上,基于每個消費(fèi)者保留的唯一元數(shù)據(jù)是該消費(fèi)者在日志中的偏移或位置。這種偏移由消費(fèi)者控制:通常消費(fèi)者在讀取記錄時會線性地提高其偏移量,但事實(shí)上,由于該位置由消費(fèi)者控制,因此它可以按照自己喜歡的任何順序消費(fèi)記錄。例如,消費(fèi)者可以重置為較舊的偏移量來重新處理過去的數(shù)據(jù),或者跳到最近的記錄并從“現(xiàn)在”開始消費(fèi)。

這些功能組合意味著Kafka 消費(fèi)者consumers 非常cheap - 他們可以來來往往對集群或其他消費(fèi)者沒有太大影響。例如,您可以使用我們的命令行工具“tail”任何主題的內(nèi)容,而無需更改任何現(xiàn)有使用者所消耗的內(nèi)容。

日志中的分區(qū)有多種用途。首先,它們允許日志擴(kuò)展到超出適合單個服務(wù)器的大小。每個單獨(dú)的分區(qū)必須適合托管它的服務(wù)器,但主題可能有許多分區(qū),因此它可以處理任意數(shù)量的數(shù)據(jù)。其次,它們充當(dāng)了并行性的單位 - 更多的是它。

1.3 Distribution分配

一個Topic的多個partitions,被分布在kafka集群中的多個server上;每個server(kafka實(shí)例)負(fù)責(zé)partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(shù)(replicas),每個partition將會被備份到多臺機(jī)器上,以提高可用性.

基于replicated方案,那么就意味著需要對多個備份進(jìn)行調(diào)度;每個partition都有一個server為"leader";leader負(fù)責(zé)所有的讀寫操作,如果leader失效,那么將會有其他follower來接管(成為新的leader);follower只是單調(diào)的和leader跟進(jìn),同步消息即可..由此可見作為leader的server承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個實(shí)例上,來確保整體的性能穩(wěn)定。

1.4 Producers生產(chǎn)者 和 Consumers消費(fèi)者

1.4.1 Producers生產(chǎn)者

Producers 將數(shù)據(jù)發(fā)布到指定的topics 主題。同時Producer 也能決定將此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等。

1.4.2 Consumers

  • 本質(zhì)上kafka只支持Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發(fā)送到Topic的消息,只會被訂閱此Topic的每個group中的一個consumer消費(fèi)。

  • 如果所有使用者實(shí)例具有相同的使用者組,則記錄將有效地在使用者實(shí)例上進(jìn)行負(fù)載平衡。

  • 如果所有消費(fèi)者實(shí)例具有不同的消費(fèi)者組,則每個記錄將廣播到所有消費(fèi)者進(jìn)程。

分析:兩個服務(wù)器Kafka群集,托管四個分區(qū)(P0-P3),包含兩個使用者組。消費(fèi)者組A有兩個消費(fèi)者實(shí)例,B組有四個消費(fèi)者實(shí)例。

在Kafka中實(shí)現(xiàn)消費(fèi)consumption 的方式是通過在消費(fèi)者實(shí)例上劃分日志中的分區(qū),以便每個實(shí)例在任何時間點(diǎn)都是分配的“公平份額”的獨(dú)占消費(fèi)者。維護(hù)組中成員資格的過程由Kafka協(xié)議動態(tài)處理。如果新實(shí)例加入該組,他們將從該組的其他成員接管一些分區(qū); 如果實(shí)例死亡,其分區(qū)將分發(fā)給其余實(shí)例。

Kafka僅提供分區(qū)內(nèi)記錄的總訂單,而不是主題中不同分區(qū)之間的記錄。對于大多數(shù)應(yīng)用程序而言,按分區(qū)排序與按鍵分區(qū)數(shù)據(jù)的能力相結(jié)合就足夠了。但是,如果您需要對記錄進(jìn)行總訂單,則可以使用僅包含一個分區(qū)的主題來實(shí)現(xiàn),但這將意味著每個使用者組只有一個使用者進(jìn)程。

1.5 Consumers kafka確保

  • 發(fā)送到partitions中的消息將會按照它接收的順序追加到日志中。也就是說,如果記錄M1由與記錄M2相同的生成者發(fā)送,并且首先發(fā)送M1,則M1將具有比M2更低的偏移并且在日志中更早出現(xiàn)。

  • 消費(fèi)者實(shí)例按照它們存儲在日志中的順序查看記錄。對于消費(fèi)者而言,它們消費(fèi)消息的順序和日志中消息順序一致。

  • 如果Topic的"replicationfactor"為N,那么允許N-1個kafka實(shí)例失效,我們將容忍最多N-1個服務(wù)器故障,而不會丟失任何提交到日志的記錄。

1.6 kafka作為消息系統(tǒng)

Kafka的流概念與傳統(tǒng)的企業(yè)郵件系統(tǒng)相比如何?

(1)傳統(tǒng)消息系統(tǒng)

消息傳統(tǒng)上有兩種模型:queuing排隊(duì) and publish-subscribe發(fā)布 - 訂閱。在隊(duì)列中,消費(fèi)者池可以從服務(wù)器讀取并且每個記錄轉(zhuǎn)到其中一個; 在發(fā)布 - 訂閱中,記錄被廣播給所有消費(fèi)者。這兩種模型中的每一種都有優(yōu)點(diǎn)和缺點(diǎn)。排隊(duì)的優(yōu)勢在于它允許您在多個消費(fèi)者實(shí)例上劃分?jǐn)?shù)據(jù)處理,從而可以擴(kuò)展您的處理。不幸的是,一旦一個進(jìn)程讀取它已經(jīng)消失的數(shù)據(jù),隊(duì)列就不是多用戶。發(fā)布 - 訂閱允許您將數(shù)據(jù)廣播到多個進(jìn)程,但由于每條消息都發(fā)送給每個訂閱者,因此無法進(jìn)行擴(kuò)展處理。

卡夫卡的消費(fèi)者群體概念概括了這兩個概念。與隊(duì)列一樣,使用者組允許您將處理劃分為一組進(jìn)程(使用者組的成員)。與發(fā)布 - 訂閱一樣,Kafka允許您向多個消費(fèi)者組廣播消息。

(2)kafka 的優(yōu)勢

Kafka模型的優(yōu)勢在于每個主題都具有這些屬性 - 它可以擴(kuò)展處理并且也是多用戶 - 不需要選擇其中一個。

與傳統(tǒng)的消息系統(tǒng)相比,Kafka具有更強(qiáng)的訂購保證。

傳統(tǒng)隊(duì)列在服務(wù)器上按順序保留記錄,如果多個消費(fèi)者從隊(duì)列中消耗,則服務(wù)器按照存儲順序分發(fā)記錄。但是,雖然服務(wù)器按順序分發(fā)記錄,但是記錄是異步傳遞給消費(fèi)者的,因此它們可能會在不同的消費(fèi)者處出現(xiàn)故障。這實(shí)際上意味著在存在并行消耗的情況下丟失記錄的順序。消息傳遞系統(tǒng)通常通過具有“獨(dú)占消費(fèi)者”概念來解決這個問題,該概念只允許一個進(jìn)程從隊(duì)列中消耗,但當(dāng)然這意味著處理中沒有并行性。

kafka做得更好。通過在主題中具有并行性概念 - 分區(qū) - ,Kafka能夠在消費(fèi)者流程池中提供訂購保證和負(fù)載平衡。這是通過將主題中的分區(qū)分配給使用者組中的使用者來實(shí)現(xiàn)的,以便每個分區(qū)僅由該組中的一個使用者使用。通過這樣做,我們確保使用者是該分區(qū)的唯一讀者并按順序使用數(shù)據(jù)。由于有許多分區(qū),這仍然可以平衡許多消費(fèi)者實(shí)例的負(fù)載。但請注意,消費(fèi)者組中的消費(fèi)者實(shí)例不能超過分區(qū)。

1.7 kafka作為存儲系統(tǒng)

  • 任何允許發(fā)布與消費(fèi)消息分離的消息的消息隊(duì)列實(shí)際上充當(dāng)了正在進(jìn)行的消息的存儲系統(tǒng)。Kafka的不同之處在于它是一個非常好的存儲系統(tǒng)。

  • 寫入Kafka的數(shù)據(jù)將寫入磁盤并進(jìn)行復(fù)制以實(shí)現(xiàn)容錯。Kafka允許生產(chǎn)者等待確認(rèn),以便在完全復(fù)制之前寫入不被認(rèn)為是完整的,并且即使寫入的服務(wù)器失敗也保證寫入仍然存在。

  • 磁盤結(jié)構(gòu)Kafka很好地使用了規(guī)模 - 無論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),Kafka都會執(zhí)行相同的操作。

  • 由于認(rèn)真對待存儲并允許客戶端控制其讀取位置,您可以將Kafka視為一種專用于高性能,低延遲提交日志存儲,復(fù)制和傳播的專用分布式文件系統(tǒng)。

1.8 kafka用于流處理

  • 僅僅讀取,寫入和存儲數(shù)據(jù)流是不夠的,目的是實(shí)現(xiàn)流的實(shí)時處理。

  • 在Kafka中,流處理器是指從輸入主題獲取連續(xù)數(shù)據(jù)流,對此輸入執(zhí)行某些處理以及生成連續(xù)數(shù)據(jù)流以輸出主題的任何內(nèi)容。

  • 例如,零售應(yīng)用程序可能會接收銷售和發(fā)貨的輸入流,并輸出重新排序流和根據(jù)此數(shù)據(jù)計算的價格調(diào)整。

  • 可以使用生產(chǎn)者和消費(fèi)者API直接進(jìn)行簡單處理。但是,對于更復(fù)雜的轉(zhuǎn)換,Kafka提供了完全集成的Streams API。這允許構(gòu)建執(zhí)行非平凡處理的應(yīng)用程序,這些應(yīng)用程序可以計算流的聚合或?qū)⒘鬟B接在一起。

  • 此工具有助于解決此類應(yīng)用程序面臨的難題:處理無序數(shù)據(jù),在代碼更改時重新處理輸入,執(zhí)行有狀態(tài)計算等。

  • 流API構(gòu)建在Kafka提供的核心原語上:它使用生產(chǎn)者和消費(fèi)者API進(jìn)行輸入,使用Kafka進(jìn)行有狀態(tài)存儲,并在流處理器實(shí)例之間使用相同的組機(jī)制來實(shí)現(xiàn)容錯。

2、kafka使用場景

2.1 消息Messaging

Kafka可以替代更傳統(tǒng)的消息代理。消息代理的使用有多種原因(將處理與數(shù)據(jù)生成器分離,緩沖未處理的消息等)。與大多數(shù)消息傳遞系統(tǒng)相比,Kafka具有更好的吞吐量,內(nèi)置分區(qū),復(fù)制和容錯功能,這使其成為大規(guī)模消息處理應(yīng)用程序的理想解決方案。

根據(jù)經(jīng)驗(yàn),消息傳遞的使用通常相對較低,但可能需要較低的端到端延遲,并且通常取決于Kafka提供的強(qiáng)大的耐用性保證。

在這個領(lǐng)域,Kafka可與傳統(tǒng)的消息傳遞系統(tǒng)(如ActiveMQ或 RabbitMQ)相媲美。

2.2 網(wǎng)站活動跟蹤

Kafka的原始用例是能夠?qū)⒂脩艋顒痈櫣艿乐亟橐唤M實(shí)時發(fā)布 - 訂閱源。這意味著站點(diǎn)活動(頁面查看,搜索或用戶可能采取的其他操作)將發(fā)布到中心主題,每個活動類型包含一個主題。這些源可用于訂購一系列用例,包括實(shí)時處理,實(shí)時監(jiān)控以及加載到Hadoop或離線數(shù)據(jù)倉庫系統(tǒng)以進(jìn)行脫機(jī)處理和報告。

活動跟蹤通常非常高,因?yàn)闉槊總€用戶頁面視圖生成了許多活動消息。

2.3 度量Metrics

Kafka通常用于運(yùn)營監(jiān)控數(shù)據(jù)。這涉及從分布式應(yīng)用程序聚合統(tǒng)計信息以生成操作數(shù)據(jù)的集中式提要。

2.4 日志聚合

許多人使用Kafka作為日志聚合解決方案的替代品。日志聚合通常從服務(wù)器收集物理日志文件,并將它們放在中央位置(可能是文件服務(wù)器或HDFS)進(jìn)行處理。Kafka抽象出文件的細(xì)節(jié),并將日志或事件數(shù)據(jù)作為消息流更清晰地抽象出來。這允許更低延遲的處理并更容易支持多個數(shù)據(jù)源和分布式數(shù)據(jù)消耗。與Scribe或Flume等以日志為中心的系統(tǒng)相比,Kafka提供了同樣出色的性能,由于復(fù)制而具有更強(qiáng)的耐用性保證,以及更低的端到端延遲。

2.5 流處理

許多Kafka用戶在處理由多個階段組成的管道時處理數(shù)據(jù),其中原始輸入數(shù)據(jù)從Kafka主題中消費(fèi),然后聚合,豐富或以其他方式轉(zhuǎn)換為新主題以供進(jìn)一步消費(fèi)或后續(xù)處理。

例如,用于推薦新聞文章的處理管道可以從RSS訂閱源抓取文章內(nèi)容并將其發(fā)布到“文章”主題; 進(jìn)一步處理可能會對此內(nèi)容進(jìn)行規(guī)范化或重復(fù)數(shù)據(jù)刪除,并將已清理的文章內(nèi)容發(fā)布到新主題; 最終處理階段可能會嘗試向用戶推薦此內(nèi)容。此類處理管道基于各個主題創(chuàng)建實(shí)時數(shù)據(jù)流的圖形。從0.10.0.0開始,這是一個輕量級但功能強(qiáng)大的流處理庫,名為Kafka Streams 在Apache Kafka中可用于執(zhí)行如上所述的此類數(shù)據(jù)處理。除了Kafka Streams之外,其他開源流處理工具包括Apache Storm和 Apache Samza。

2.6 Event Sourcing

Event Sourcing是一種應(yīng)用程序設(shè)計風(fēng)格,其中狀態(tài)更改記錄為按時間排序的記錄序列。Kafka對非常大的存儲日志數(shù)據(jù)的支持使其成為以這種風(fēng)格構(gòu)建的應(yīng)用程序的出色后端。

2.7 提交日志

Kafka可以作為分布式系統(tǒng)的一種外部提交日志。該日志有助于在節(jié)點(diǎn)之間復(fù)制數(shù)據(jù),并充當(dāng)故障節(jié)點(diǎn)恢復(fù)其數(shù)據(jù)的重新同步機(jī)制。Kafka中的日志壓縮功能有助于支持此用法。在這種用法中,Kafka類似于Apache BookKeeper項(xiàng)目。

3、kafka安裝

3.1 下載安裝

到官網(wǎng)http://kafka.apache.org/downloads.html下載想要的版本。

注:由于Kafka控制臺腳本對于基于Unix和Windows的平臺是不同的,因此在Windows平臺上使用bin\windows\ 而不是bin/ 將腳本擴(kuò)展名更改為.bat。

[root@along?~]#?wget?http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz?? [root@along?~]#?tar?-C?/data/?-xvf?kafka_2.11-2.1.0.tgz?? [root@along?~]#?cd?/data/kafka_2.11-2.1.0/??

3.2 配置啟動zookeeper

kafka正常運(yùn)行,必須配置zookeeper,否則無論是kafka集群還是客戶端的生存者和消費(fèi)者都無法正常的工作的;所以需要配置啟動zookeeper服務(wù)。

(1)zookeeper需要java環(huán)境

[root@along?~]#?yum?-y?install?java-1.8.0??

(2)這里kafka下載包已經(jīng)包括zookeeper服務(wù),所以只需修改配置文件,啟動即可。

如果需要下載指定zookeeper版本;可以單獨(dú)去zookeeper官網(wǎng)http://mirrors.shu.edu.cn/apache/zookeeper/下載指定版本。

[root@along?~]#?cd?/data/kafka_2.11-2.1.0/?? [root@along?kafka_2.11-2.1.0]#?grep?"^[^#]"?config/zookeeper.properties?? dataDir=/tmp/zookeeper???#數(shù)據(jù)存儲目錄?? clientPort=2181???#zookeeper端口?? maxClientCnxns=0??

注:可自行添加修改zookeeper配置

3.3 配置kafka

(1)修改配置文件

[root@along?kafka_2.11-2.1.0]#?grep?"^[^#]"?config/server.properties?? broker.id=0?? listeners=PLAINTEXT://localhost:9092?? num.network.threads=3?? num.io.threads=8?? socket.send.buffer.bytes=102400?? socket.receive.buffer.bytes=102400?? socket.request.max.bytes=104857600?? log.dirs=/tmp/kafka-logs?? num.partitions=1?? num.recovery.threads.per.data.dir=1?? offsets.topic.replication.factor=1?? transaction.state.log.replication.factor=1?? transaction.state.log.min.isr=1?? log.retention.hours=168?? log.segment.bytes=1073741824?? log.retention.check.interval.ms=300000?? zookeeper.connect=localhost:2181?? zookeeper.connection.timeout.ms=6000?? group.initial.rebalance.delay.ms=0??

注:可根據(jù)自己需求修改配置文件

?broker.id:#唯一標(biāo)識ID??listeners=PLAINTEXT://localhost:9092:#kafka服務(wù)監(jiān)聽地址和端口??log.dirs:#日志存儲目錄??zookeeper.connect:#指定zookeeper服務(wù)??

(2)配置環(huán)境變量

[root@along?~]#?vim?/etc/profile.d/kafka.sh?? export?KAFKA_HOME="/data/kafka_2.11-2.1.0"?? export?PATH="${KAFKA_HOME}/bin:$PATH"?? [root@along?~]#?source?/etc/profile.d/kafka.sh??

(3)配置服務(wù)啟動腳本

[root@along?~]#?vim?/etc/init.d/kafka?? #!/bin/sh?? #?? #?chkconfig:?345?99?01?? #?description:?Kafka?? #?? #?File?:?Kafka?? #?? #?Description:?Starts?and?stops?the?Kafka?server?? #??source?/etc/rc.d/init.d/functions??KAFKA_HOME=/data/kafka_2.11-2.1.0?? KAFKA_USER=root?? export?LOG_DIR=/tmp/kafka-logs??[?-e?/etc/sysconfig/kafka?]?&&?.?/etc/sysconfig/kafka??#?See?how?we?were?called.?? case?"$1"?in??start)??echo?-n?"Starting?Kafka:"??/sbin/runuser?-s?/bin/sh?$KAFKA_USER?-c?"nohup?$KAFKA_HOME/bin/kafka-server-start.sh?$KAFKA_HOME/config/server.properties?>?$LOG_DIR/server.out?2>?$LOG_DIR/server.err?&"??echo?"?done."??exit?0??;;??stop)??echo?-n?"Stopping?Kafka:?"??/sbin/runuser?-s?/bin/sh?$KAFKA_USER??-c?"ps?-ef?|?grep?kafka.Kafka?|?grep?-v?grep?|?awk?'{print?\$2}'?|?xargs?kill?\-9"??echo?"?done."??exit?0??;;??hardstop)??echo?-n?"Stopping?(hard)?Kafka:?"??/sbin/runuser?-s?/bin/sh?$KAFKA_USER??-c?"ps?-ef?|?grep?kafka.Kafka?|?grep?-v?grep?|?awk?'{print?\$2}'?|?xargs?kill?-9"??echo?"?done."??exit?0??;;??status)??c_pid=`ps?-ef?|?grep?kafka.Kafka?|?grep?-v?grep?|?awk?'{print?$2}'`??if?[?"$c_pid"?=?""?]?;?then??echo?"Stopped"??exit?3??else??echo?"Running?$c_pid"??exit?0??fi??;;??restart)??stop??start??;;??*)??echo?"Usage:?kafka?{start|stop|hardstop|status|restart}"??exit?1??;;??esac??

3.4 啟動kafka服務(wù)

(1)后臺啟動zookeeper服務(wù)

[root@along?~]#?nohup?zookeeper-server-start.sh?/data/kafka_2.11-2.1.0/config/zookeeper.properties?&??

(2)啟動kafka服務(wù)

[root@along?~]#?service?kafka?start?? Starting?kafka?(via?systemctl):????????????????????????????[??OK??]?? [root@along?~]#?service?kafka?status?? Running?86018?? [root@along?~]#?ss?-nutl?? Netid?State??????Recv-Q?Send-Q?????Local?Address:Port????????????????????Peer?Address:Port???????????????????????????????? tcp???LISTEN?????0??????50????????????????????:::9092??????????????????????????????:::*??????????????????? tcp???LISTEN?????0??????50????????????????????:::2181??????????????????????????????:::*??

4、kafka使用簡單入門

4.1 創(chuàng)建主題topics

創(chuàng)建一個名為“along”的主題,它只包含一個分區(qū),只有一個副本:

[root@along?~]#?kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?1?--partitions?1?--topic?along?? Created?topic?"along".??

如果我們運(yùn)行l(wèi)ist topic命令,我們現(xiàn)在可以看到該主題:

[root@along?~]#?kafka-topics.sh?--list?--zookeeper?localhost:2181?? along???

4.2 發(fā)送一些消息

Kafka附帶一個命令行客戶端,它將從文件或標(biāo)準(zhǔn)輸入中獲取輸入,并將其作為消息發(fā)送到Kafka集群。默認(rèn)情況下,每行將作為單獨(dú)的消息發(fā)送。

運(yùn)行生產(chǎn)者,然后在控制臺中鍵入一些消息以發(fā)送到服務(wù)器。

[root@along?~]#?kafka-console-producer.sh?--broker-list?localhost:9092?--topic?along?? >This?is?a?message?? >This?is?another?message??

4.3 啟動消費(fèi)者

Kafka還有一個命令行使用者,它會將消息轉(zhuǎn)儲到標(biāo)準(zhǔn)輸出。

[root@along?~]#?kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--topic?along?--from-beginning?? This?is?a?message?? This?is?another?message??

所有命令行工具都有其他選項(xiàng); 運(yùn)行不帶參數(shù)的命令將顯示更詳細(xì)地記錄它們的使用信息。

5、設(shè)置多代理kafka群集

到目前為止,我們一直在與一個broker運(yùn)行,但這并不好玩。對于Kafka,單個代理只是一個大小為1的集群,因此除了啟動一些代理實(shí)例之外沒有太多變化。但是為了感受它,讓我們將我們的集群擴(kuò)展到三個節(jié)點(diǎn)(仍然在我們的本地機(jī)器上)。

5.1 準(zhǔn)備配置文件

[root@along?kafka_2.11-2.1.0]#?cd?/data/kafka_2.11-2.1.0/?? [root@along?kafka_2.11-2.1.0]#?cp?config/server.properties?config/server-1.properties?? [root@along?kafka_2.11-2.1.0]#?cp?config/server.properties?config/server-2.properties?? [root@along?kafka_2.11-2.1.0]#?vim?config/server-1.properties??broker.id=1??listeners=PLAINTEXT://:9093??log.dirs=/tmp/kafka-logs-1?? [root@along?kafka_2.11-2.1.0]#?vim?config/server-2.properties??broker.id=2??listeners=PLAINTEXT://:9094??log.dirs=/tmp/kafka-logs-2??

注:該broker.id 屬性是群集中每個節(jié)點(diǎn)的唯一且永久的名稱。我們必須覆蓋端口和日志目錄,因?yàn)槲覀冊谕慌_機(jī)器上運(yùn)行這些,并且我們希望讓所有代理嘗試在同一端口上注冊或覆蓋彼此的數(shù)據(jù)。

5.2 開啟集群另2個kafka服務(wù)

[root@along?~]#?nohup?kafka-server-start.sh?/data/kafka_2.11-2.1.0/config/server-1.properties?&?? [root@along?~]#?nohup?kafka-server-start.sh?/data/kafka_2.11-2.1.0/config/server-2.properties?&?? [root@along?~]#?ss?-nutl?? Netid?State??????Recv-Q?Send-Q?????Local?Address:Port????????????????????Peer?Address:Port???????????????????????????? tcp???LISTEN?????0??????50??????::ffff:127.0.0.1:9092??????????????????????????????:::*??????????????????? tcp???LISTEN?????0??????50??????::ffff:127.0.0.1:9093??????????????????????????????:::*?????????????????????????????????? tcp???LISTEN?????0??????50??????::ffff:127.0.0.1:9094??????????????????????????????:::*??

5.3 在集群中進(jìn)行操作

(1)現(xiàn)在創(chuàng)建一個復(fù)制因子為3的新主題my-replicated-topic

[root@along?~]#?kafka-topics.sh?--create?--zookeeper?localhost:2181?--replication-factor?3?--partitions?1?--topic?my-replicated-topic?? Created?topic?"my-replicated-topic".??

(2)在一個集群中,運(yùn)行“describe topics”命令查看哪個broker正在做什么

[root@along?~]#?kafka-topics.sh?--describe?--zookeeper?localhost:2181?--topic?my-replicated-topic?? Topic:my-replicated-topic???PartitionCount:1????ReplicationFactor:3?Configs:??Topic:?my-replicated-topic??Partition:?0????Leader:?2???Replicas:?2,0,1?Isr:?2,0,1??#注釋:第一行給出了所有分區(qū)的摘要,每個附加行提供有關(guān)一個分區(qū)的信息。由于我們只有一個分區(qū)用于此主題,因此只有一行。??#“l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。每個節(jié)點(diǎn)將成為隨機(jī)選擇的分區(qū)部分的領(lǐng)導(dǎo)者。??#“replicas”是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動狀態(tài)。??#?“isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。??#請注意,Leader: 2,在我的示例中,節(jié)點(diǎn)2 是該主題的唯一分區(qū)的Leader。??

(3)可以在我們創(chuàng)建的原始主題上運(yùn)行相同的命令,以查看它的位置

[root@along?~]#?kafka-topics.sh?--describe?--zookeeper?localhost:2181?--topic?along?? Topic:along?PartitionCount:1????ReplicationFactor:1?Configs:??Topic:?along????Partition:?0????Leader:?0???Replicas:?0?Isr:?0??

(4)向我們的新主題發(fā)布一些消息:

[root@along?~]#?kafka-console-producer.sh?--broker-list?localhost:9092?--topic?my-replicated-topic?? >my?test?message?1?? >my?test?message?2??

(5)現(xiàn)在讓我們使用這些消息:

[root@along?~]#?kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--from-beginning?--topic?my-replicated-topic?? my?test?message?1?? my?test?message?2??

5.4 測試集群的容錯性

(1)現(xiàn)在讓我們測試一下容錯性。Broker 2 充當(dāng)leader 所以讓我們殺了它:

[root@along?~]#?ps?aux?|?grep?server-2.properties?|awk?'{print?$2}'?? 106737?? [root@along?~]#?kill?-9?106737?? [root@along?~]#?ss?-nutl?? tcp???LISTEN?????0??????50??????::ffff:127.0.0.1:9092??????????????????????????????:::*????????????????????????? tcp???LISTEN?????0??????50??????::ffff:127.0.0.1:9093??????????????????????????????:::*??

(2)leader 已切換到其中一個從屬節(jié)點(diǎn),節(jié)點(diǎn)2不再位于同步副本集中:

[root@along?~]#?kafka-topics.sh?--describe?--zookeeper?localhost:2181?--topic?my-replicated-topic?? Topic:my-replicated-topic???PartitionCount:1????ReplicationFactor:3?Configs:??Topic:?my-replicated-topic??Partition:?0????Leader:?0???Replicas:?2,0,1?Isr:?0,1??

(3)即使最初接受寫入的leader 已經(jīng)失敗,這些消息仍可供消費(fèi):

[root@along?~]#?kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--from-beginning?--topic?my-replicated-topic?? my?test?message?1?? my?test?message?2??

6、使用Kafka Connect導(dǎo)入/導(dǎo)出數(shù)據(jù)

從控制臺寫入數(shù)據(jù)并將其寫回控制臺是一個方便的起點(diǎn),但有時候可能希望使用其他來源的數(shù)據(jù)或?qū)?shù)據(jù)從Kafka導(dǎo)出到其他系統(tǒng)。對于許多系統(tǒng),您可以使用Kafka Connect導(dǎo)入或?qū)С鰯?shù)據(jù),而不是編寫自定義集成代碼。

Kafka Connect是Kafka附帶的工具,用于向Kafka導(dǎo)入和導(dǎo)出數(shù)據(jù)。它是一個可擴(kuò)展的工具,運(yùn)行連接器,實(shí)現(xiàn)與外部系統(tǒng)交互的自定義邏輯。在本快速入門中,我們將了解如何使用簡單的連接器運(yùn)行Kafka Connect,這些連接器將數(shù)據(jù)從文件導(dǎo)入Kafka主題并將數(shù)據(jù)從Kafka主題導(dǎo)出到文件。

(1)首先創(chuàng)建一些種子數(shù)據(jù)進(jìn)行測試:

[root@along?~]#?echo?-e?"foo\nbar"?>?test.txt?? 或者在Windows上:?? >?echo?foo>?test.txt?? >?echo?bar>>?test.txt??

(2)接下來,啟動兩個以獨(dú)立模式運(yùn)行的連接器,這意味著它們在單個本地專用進(jìn)程中運(yùn)行。提供三個配置文件作為參數(shù)。

第一個始終是Kafka Connect流程的配置,包含常見配置,例如要連接的Kafka代理和數(shù)據(jù)的序列化格式。

其余配置文件均指定要創(chuàng)建的連接器。這些文件包括唯一的連接器名稱,要實(shí)例化的連接器類以及連接器所需的任何其他配置。

[root@along?~]#?connect-standalone.sh?config/connect-standalone.properties?config/connect-file-source.properties?config/connect-file-sink.properties?? [2019-01-16?16:16:31,884]?INFO?Kafka?Connect?standalone?worker?initializing?...?(org.apache.kafka.connect.cli.ConnectStandalone:67)?? [2019-01-16?16:16:31,903]?INFO?WorkerInfo?values:?? ...?...?? #注:Kafka附帶的這些示例配置文件使用您之前啟動的默認(rèn)本地群集配置并創(chuàng)建兩個連接器:第一個是源連接器,它從輸入文件讀取行并生成每個Kafka主題,第二個是宿連接器從Kafka主題讀取消息并將每個消息生成為輸出文件中的一行。??

(3)驗(yàn)證是否導(dǎo)入成功(另起終端)

在啟動過程中,您將看到許多日志消息,包括一些指示正在實(shí)例化連接器的日志消息。

① 一旦Kafka Connect進(jìn)程啟動,源連接器應(yīng)該開始從test.txt主題讀取行并將其生成到主題connect-test,并且接收器連接器應(yīng)該開始從主題讀取消息connect-test 并將它們寫入文件test.sink.txt。我們可以通過檢查輸出文件的內(nèi)容來驗(yàn)證數(shù)據(jù)是否已通過整個管道傳遞:

[root@along?~]#?cat?test.sink.txt?? foo?? bar??

② 請注意,數(shù)據(jù)存儲在Kafka主題中connect-test,因此我們還可以運(yùn)行控制臺使用者來查看主題中的數(shù)據(jù)(或使用自定義使用者代碼來處理它):

[root@along?~]#?kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--topic?connect-test?--from-beginning?? {"schema":{"type":"string","optional":false},"payload":"foo"}?? {"schema":{"type":"string","optional":false},"payload":"bar"}??

(4)繼續(xù)追加數(shù)據(jù),驗(yàn)證

[root@along?~]#?echo?Another?line>>?test.txt??????? [root@along?~]#?cat?test.sink.txt?? foo?? bar?? Another?line?? [root@along?~]#?kafka-console-consumer.sh?--bootstrap-server?localhost:9092?--topic?connect-test?--from-beginning?? {"schema":{"type":"string","optional":false},"payload":"foo"}?? {"schema":{"type":"string","optional":false},"payload":"bar"}?? {"schema":{"type":"string","optional":false},"payload":"Another?line"

來源:cnblogs.com/along21/p/10278100.html
(版權(quán)歸原作者所有,侵刪)

推薦文章1、14個項(xiàng)目!2、Spring Boot + Security + MyBatis + Thymeleaf + Activiti 快速開發(fā)平臺項(xiàng)目3、推薦幾個支付項(xiàng)目!4、寫博客能月入10K?5、一款基于 Spring Boot 的現(xiàn)代化社區(qū)(論壇/問答/社交網(wǎng)絡(luò)/博客)更多項(xiàng)目源碼 1、推薦兩個項(xiàng)目!2、重磅推薦:一套開源的網(wǎng)校系統(tǒng),無論是自建網(wǎng)校還是接副業(yè)都很方便 3、一款基于 Spring Boot 的現(xiàn)代化社區(qū)(論壇/問答/社交網(wǎng)絡(luò)/博客) 4、13K點(diǎn)贊都基于 Vue+Spring 前后端分離管理系統(tǒng)ELAdmin,大愛5、想接私活時薪再翻一倍,建議根據(jù)這幾個開源的SpringBoot

總結(jié)

以上是生活随笔為你收集整理的超详细 kafka 入门(最佳实践)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。