2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
?
目錄
Kafka快速回顧
?消息隊(duì)列:
?發(fā)布/訂閱模式:
Kafka 重要概念:
常用命令
整合說(shuō)明
兩種方式
兩個(gè)版本API
在實(shí)際項(xiàng)目中,無(wú)論使用Storm還是SparkStreaming與Flink,主要從Kafka實(shí)時(shí)消費(fèi)數(shù)據(jù)進(jìn)行處理分析,流式數(shù)據(jù)實(shí)時(shí)處理技術(shù)架構(gòu)大致如下:
?
技術(shù)棧: Flume/SDK/Kafka Producer API ?-> KafKa ?—> ?SparkStreaming/Flink/Storm(Hadoop YARN) -> Redis ?-> UI
1)、阿里工具Canal:監(jiān)控MySQL數(shù)據(jù)庫(kù)binlog文件,將數(shù)據(jù)同步發(fā)送到Kafka Topic中https://github.com/alibaba/canalhttps://github.com/alibaba/canal/wiki/QuickStart2)、Maxwell:實(shí)時(shí)讀取MySQL二進(jìn)制日志binlog,并生成 JSON 格式的消息,作為生產(chǎn)者發(fā)送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平臺(tái)的應(yīng)用程序。http://maxwells-daemon.io/https://github.com/zendesk/maxwell
擴(kuò)展:Kafka 相關(guān)常見(jiàn)面試題:
1)、Kafka 集群大小(規(guī)模),Topic分區(qū)函數(shù)名及集群配置?2)、Topic中數(shù)據(jù)如何管理?數(shù)據(jù)刪除策略是什么?3)、如何消費(fèi)Kafka數(shù)據(jù)?4)、發(fā)送數(shù)據(jù)Kafka Topic中時(shí),如何保證數(shù)據(jù)發(fā)送成功?
?
Apache Kafka: 最原始功能【消息隊(duì)列】,緩沖數(shù)據(jù),具有發(fā)布訂閱功能(類似微信公眾號(hào))。
Kafka快速回顧
Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用與大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
?消息隊(duì)列:
Kafka 本質(zhì)上是一個(gè) MQ(Message Queue),使用消息隊(duì)列的好處?(面試會(huì)問(wèn))
- 解耦:允許我們獨(dú)立的擴(kuò)展或修改隊(duì)列兩邊的處理過(guò)程;
- 可恢復(fù)性:即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍可以在系統(tǒng)恢復(fù)后被處理;
- 緩沖:有助于解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況;
- 靈活性&峰值處理能力:不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰,消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力;
- 異步通信:消息隊(duì)列允許用戶把消息放入隊(duì)列但不立即處理它;
?發(fā)布/訂閱模式:
?
?
一對(duì)多,生產(chǎn)者將消息發(fā)布到 Topic 中,有多個(gè)消費(fèi)者訂閱該主題,發(fā)布到 Topic 的消息會(huì)被所有訂閱者消費(fèi),被消費(fèi)的數(shù)據(jù)不會(huì)立即從 Topic 清除。
Kafka 框架架構(gòu)圖如下所示:
?
Kafka 存儲(chǔ)的消息來(lái)自任意多被稱為 Producer 生產(chǎn)者的進(jìn)程,數(shù)據(jù)從而可以被發(fā)布到不同的 Topic 主題下的不同 Partition 分區(qū)。在一個(gè)分區(qū)內(nèi),這些消息被索引并連同時(shí)間戳存儲(chǔ)在一起。其它被稱為 Consumer 消費(fèi)者的進(jìn)程可以從分區(qū)訂閱消息。Kafka 運(yùn)行在一個(gè)由一臺(tái)或多臺(tái)服務(wù)器組成的集群上,并且分區(qū)可以跨集群結(jié)點(diǎn)分布。
Kafka 重要概念:
?1)、Producer: 消息生產(chǎn)者,向 Kafka Broker 發(fā)消息的客戶端;
?2)、Consumer:消息消費(fèi)者,從 Kafka Broker 取消息的客戶端;
?3)、Consumer Group:消費(fèi)者組(CG),消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),提高消費(fèi)能力。一個(gè)分區(qū)只能由組內(nèi)一個(gè)消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者;
?4)、Broker:一臺(tái) Kafka 機(jī)器就是一個(gè) Broker。一個(gè)集群由多個(gè) Broker 組成。一個(gè) Broker 可以容納多個(gè) Topic;
?5)、Topic:可以理解為一個(gè)隊(duì)列,Topic 將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個(gè) Topic;
?6)、Partition:為了實(shí)現(xiàn)擴(kuò)展性,提高并發(fā)能力,一個(gè)非常大的 Topic 可以分布到多個(gè) Broker (即服務(wù)器)上,一個(gè) Topic 可以分為多個(gè) Partition,每個(gè) Partition 是一個(gè) 有序的隊(duì)列;
?7)、Replica:副本,為實(shí)現(xiàn)備份的功能,保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 Partition 數(shù)據(jù)不丟失,且 Kafka 仍然能夠繼續(xù)工作,Kafka 提供了副本機(jī)制,一個(gè) Topic 的每個(gè)分區(qū)都有若干個(gè)副本,一個(gè) Leader 和若干個(gè) Follower;
?8)、Leader:每個(gè)分區(qū)多個(gè)副本的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象,都是 Leader;
?9)、Follower:每個(gè)分區(qū)多個(gè)副本的“從”副本,實(shí)時(shí)從 Leader 中同步數(shù)據(jù),保持和 Leader 數(shù)據(jù)的同步。Leader 發(fā)生故障時(shí),某個(gè) Follower 還會(huì)成為新的 Leader;
?10)、Offset:消費(fèi)者消費(fèi)的位置信息,監(jiān)控?cái)?shù)據(jù)消費(fèi)到什么位置,當(dāng)消費(fèi)者掛掉再重新恢復(fù)的時(shí)候,可以從消費(fèi)位置繼續(xù)消費(fèi);
?11)、Zookeeper:Kafka 集群能夠正常工作,需要依賴于 Zookeeper,Zookeeper 幫助 Kafka 存儲(chǔ)和管理集群信息;
?
常用命令
#啟動(dòng)kafka/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties #停止kafka/export/server/kafka/bin/kafka-server-stop.sh #查看topic信息/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181#創(chuàng)建topic/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1?--partitions 3 --topic test#查看某個(gè)topic信息/export/server/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test#刪除topic/export/server/kafka/bin/kafka-topics.sh --zookeeper node1:2181 --delete --topic test#啟動(dòng)生產(chǎn)者--控制臺(tái)的生產(chǎn)者--一般用于測(cè)試/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka# 啟動(dòng)消費(fèi)者--控制臺(tái)的消費(fèi)者/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark_kafka --from-beginning
?
整合說(shuō)明
兩種方式
Receiver-based Approach:
1.KafkaUtils.createDstream基于接收器方式,消費(fèi)Kafka數(shù)據(jù),已淘汰,企業(yè)中不再使用;
2.Receiver作為常駐的Task運(yùn)行在Executor等待數(shù)據(jù),但是一個(gè)Receiver效率低,需要開(kāi)啟多個(gè),再手動(dòng)合并數(shù)據(jù)(union),再進(jìn)行處理,很麻煩;
3.Receiver那臺(tái)機(jī)器掛了,可能會(huì)丟失數(shù)據(jù),所以需要開(kāi)啟WAL(預(yù)寫(xiě)日志)保證數(shù)據(jù)安全,那么效率又會(huì)降低;
4.Receiver方式是通過(guò)zookeeper來(lái)連接kafka隊(duì)列,調(diào)用Kafka高階API,offset存儲(chǔ)在zookeeper,由Receiver維護(hù);
5.Spark在消費(fèi)的時(shí)候?yàn)榱吮WC數(shù)據(jù)不丟也會(huì)在Checkpoint中存一份offset,可能會(huì)出現(xiàn)數(shù)據(jù)不一致;
?
Direct Approach (No Receivers):
1.KafkaUtils.createDirectStream直連方式,Streaming中每批次的每個(gè)job直接調(diào)用Simple Consumer API獲取對(duì)應(yīng)Topic數(shù)據(jù),此種方式使用最多,面試時(shí)被問(wèn)的最多;
2.Direct方式是直接連接kafka分區(qū)來(lái)獲取數(shù)據(jù),從每個(gè)分區(qū)直接讀取數(shù)據(jù)大大提高并行能力
3.Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲(chǔ)和維護(hù),默認(rèn)由Spark維護(hù)在checkpoint中,消除了與zk不一致的情況 ;
4.當(dāng)然也可以自己手動(dòng)維護(hù),把offset存在MySQL/Redis中;
?
?
?
兩個(gè)版本API
Spark Streaming與Kafka集成,有兩套API,原因在于Kafka Consumer API有兩套,文檔:
http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html
?
1. Kafka 0.8.x版本 -早已淘汰
- 底層使用老的KafkaAPI:Old Kafka Consumer?API
- 支持Receiver(已淘汰)和Direct模式:
2.Kafka 0.10.x版本-開(kāi)發(fā)中使用
- 底層使用新的KafkaAPI: New Kafka Consumer API
- 只支持Direct模式
?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021年大数据Spark(四十一):S
- 下一篇: 2021年大数据Spark(四十三):S