超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践
來源 | Alice菌
責編 | Carol
封圖 |? CSDN 下載于視覺中國
相信很多小伙伴已經(jīng)接觸過?SparkStreaming 了,理論就不講太多了,今天的內(nèi)容主要是為大家?guī)淼氖?SparkStreaming 整合 Kafka 的教程。
文中含代碼,感興趣的朋友可以復制動手試試!
Kafka回顧
正式開始之前,先讓我們來對Kafka回顧一波。
核心概念圖解
Broker:安裝Kafka服務的機器就是一個broker
Producer:消息的生產(chǎn)者,負責將數(shù)據(jù)寫入到broker中(push)
Consumer:消息的消費者,負責從kafka中拉取數(shù)據(jù)(pull),老版本的消費者需要依賴zk,新版本的不需要
Topic: 主題,相當于是數(shù)據(jù)的一個分類,不同topic存放不同業(yè)務的數(shù)據(jù) –主題:區(qū)分業(yè)務
Replication:副本,數(shù)據(jù)保存多少份(保證數(shù)據(jù)不丟失) –副本:數(shù)據(jù)安全
Partition:分區(qū),是一個物理的分區(qū),一個分區(qū)就是一個文件,一個Topic可以有1~n個分區(qū),每個分區(qū)都有自己的副本 –分區(qū):并發(fā)讀寫
Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那么他們不能重復消費數(shù)據(jù) –消費者組:提高消費者消費速度、方便統(tǒng)一管理
注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題
注意[2]:讀數(shù)據(jù)只能從Leader讀, 寫數(shù)據(jù)也只能往Leader寫,Follower會從Leader那里同步數(shù)據(jù)過來做副本!!!
常用命令
啟動kafka
/export/servers/kafka/bin/kafka-server-start.sh?-daemon?/export/servers/kafka/config/server.properties?停止kafka
/export/servers/kafka/bin/kafka-server-stop.sh?查看topic信息
/export/servers/kafka/bin/kafka-topics.sh?--list?--zookeeper?node01:2181創(chuàng)建topic
/export/servers/kafka/bin/kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?3?--partitions?3?--topic?test查看某個topic信息
/export/servers/kafka/bin/kafka-topics.sh?--describe?--zookeeper?node01:2181?--topic?test刪除topic
/export/servers/kafka/bin/kafka-topics.sh?--zookeeper?node01:2181?--delete?--topic?test啟動生產(chǎn)者–控制臺的生產(chǎn)者一般用于測試
/export/servers/kafka/bin/kafka-console-producer.sh?--broker-list?node01:9092?--topic?spark_kafka啟動消費者–控制臺的消費者一般用于測試
/export/servers/kafka/bin/kafka-console-consumer.sh?--zookeeper?node01:2181?--topic?spark_kafka--from-beginning消費者連接到borker的地址
/export/servers/kafka/bin/kafka-console-consumer.sh?--bootstrap-server?node01:9092,node02:9092,node03:9092?--topic?spark_kafka?--from-beginning?整合kafka兩種模式說明
這同時也是一個面試題的熱點。
開發(fā)中我們經(jīng)常會利用SparkStreaming實時地讀取kafka中的數(shù)據(jù)然后進行處理,在spark1.3版本后,kafkaUtils里面提供了兩種創(chuàng)建DStream的方法:
1、Receiver接收方式:
KafkaUtils.createDstream(開發(fā)中不用,了解即可,但是面試可能會問)。
Receiver作為常駐的Task運行在Executor等待數(shù)據(jù),但是一個Receiver效率低,需要開啟多個,再手動合并數(shù)據(jù)(union),再進行處理,很麻煩
Receiver哪臺機器掛了,可能會丟失數(shù)據(jù),所以需要開啟WAL(預寫日志)保證數(shù)據(jù)安全,那么效率又會降低!
Receiver方式是通過zookeeper來連接kafka隊列,調(diào)用Kafka高階API,offset存儲在zookeeper,由Receiver維護。
spark在消費的時候為了保證數(shù)據(jù)不丟也會在Checkpoint中存一份offset,可能會出現(xiàn)數(shù)據(jù)不一致
所以不管從何種角度來說,Receiver模式都不適合在開發(fā)中使用了,已經(jīng)淘汰了
2、Direct直連方式
KafkaUtils.createDirectStream(開發(fā)中使用,要求掌握)
Direct方式是直接連接kafka分區(qū)來獲取數(shù)據(jù),從每個分區(qū)直接讀取數(shù)據(jù)大大提高了并行能力
Direct方式調(diào)用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況
當然也可以自己手動維護,把offset存在mysql、redis中
所以基于Direct模式可以在開發(fā)中使用,且借助Direct模式的特點+手動操作可以保證數(shù)據(jù)的Exactly once 精準一次
總結(jié):
Receiver接收方式
多個Receiver接受數(shù)據(jù)效率高,但有丟失數(shù)據(jù)的風險
開啟日志(WAL)可防止數(shù)據(jù)丟失,但寫兩遍數(shù)據(jù)效率低。
Zookeeper維護offset有重復消費數(shù)據(jù)可能。
使用高層次的API
Direct直連方式
不使用Receiver,直接到kafka分區(qū)中讀取數(shù)據(jù)
不使用日志(WAL)機制
Spark自己維護offset
使用低層次的API
擴展:關(guān)于消息語義
注意:
開發(fā)中SparkStreaming和kafka集成有兩個版本:0.8及0.10+
0.8版本有Receiver和Direct模式(但是0.8版本生產(chǎn)環(huán)境問題較多,在Spark2.3之后不支持0.8版本了)。
0.10以后只保留了direct模式(Reveiver模式不適合生產(chǎn)環(huán)境),并且0.10版本API有變化(更加強大)
結(jié)論:
我們學習和開發(fā)都直接使用0.10版本中的direct模式,但是關(guān)于Receiver和Direct的區(qū)別面試的時候要能夠答得上來
spark-streaming-kafka-0-8(了解)
1.Receiver
KafkaUtils.createDstream使用了receivers來接收數(shù)據(jù),利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對于所有的receivers接收到的數(shù)據(jù)將會保存在Spark executors中,然后通過Spark Streaming啟動job來處理這些數(shù)據(jù),默認會丟失,可啟用WAL日志,它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS。保證數(shù)據(jù)在出錯的情況下可以恢復出來。盡管這種方式配合著WAL機制可以保證數(shù)據(jù)零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數(shù)據(jù)被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
(官方現(xiàn)在已經(jīng)不推薦這種整合方式。)
準備工作
1)啟動zookeeper集群
zkServer.sh?start2)啟動kafka集群
kafka-server-start.sh??/export/servers/kafka/config/server.properties3.創(chuàng)建topic
kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?1?--partitions?3?--topic?spark_kafka4.通過shell命令向topic發(fā)送消息
kafka-console-producer.sh?--broker-list?node01:9092?--topic??spark_kafka5.添加kafka的pom依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version> </dependency>API
通過receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運行更多的接收器讀取kafak topic中的數(shù)據(jù),這里為3個
?val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)
代碼演示
import?org.apache.spark.streaming.dstream.{DStream,?ReceiverInputDStream} import?org.apache.spark.streaming.kafka.KafkaUtils import?org.apache.spark.streaming.{Seconds,?StreamingContext} import?org.apache.spark.{SparkConf,?SparkContext}import?scala.collection.immutableobject?SparkKafka?{def?main(args:?Array[String]):?Unit?=?{//1.創(chuàng)建StreamingContextval?config:?SparkConf?=? new?SparkConf().setAppName("SparkStream").setMaster("local[*]").set("spark.streaming.receiver.writeAheadLog.enable",?"true") //開啟WAL預寫日志,保證數(shù)據(jù)源端可靠性val?sc?=?new?SparkContext(config)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))ssc.checkpoint("./kafka") //==============================================//2.準備配置參數(shù)val?zkQuorum?=?"node01:2181,node02:2181,node03:2181"val?groupId?=?"spark"val?topics?=?Map("spark_kafka"?->?2)//2表示每一個topic對應分區(qū)都采用2個線程去消費, //ssc的rdd分區(qū)和kafka的topic分區(qū)不一樣,增加消費線程數(shù),并不增加spark的并行處理數(shù)據(jù)數(shù)量//3.通過receiver接收器獲取kafka中topic數(shù)據(jù),可以并行運行更多的接收器讀取kafak?topic中的數(shù)據(jù),這里為3個val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})//4.使用union方法,將所有receiver接受器產(chǎn)生的Dstream進行合并val?allDStream:?DStream[(String,?String)]?=?ssc.union(receiverDStream)//5.獲取topic的數(shù)據(jù)(String,?String)?第1個String表示topic的名稱,第2個String表示topic的數(shù)據(jù)val?data:?DStream[String]?=?allDStream.map(_._2) //==============================================//6.WordCountval?words:?DStream[String]?=?data.flatMap(_.split("?"))val?wordAndOne:?DStream[(String,?Int)]?=?words.map((_,?1))val?result:?DStream[(String,?Int)]?=?wordAndOne.reduceByKey(_?+?_)result.print()ssc.start()ssc.awaitTermination()} }2.Direct
Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據(jù)偏移量范圍在每個batch里面處理數(shù)據(jù),Spark通過調(diào)用kafka簡單的消費者API讀取一定范圍的數(shù)據(jù)。
Direct的缺點是無法使用基于zookeeper的kafka監(jiān)控工具
Direct相比基于Receiver方式有幾個優(yōu)點:
簡化并行
不需要創(chuàng)建多個kafka輸入流,然后union它們,sparkStreaming將會創(chuàng)建和kafka分區(qū)數(shù)一樣的rdd的分區(qū)數(shù),而且會從kafka中并行讀取數(shù)據(jù),spark中RDD的分區(qū)數(shù)和kafka中的分區(qū)數(shù)據(jù)是一一對應的關(guān)系。
高效?
Receiver實現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預先保存在WAL中,會復制一遍數(shù)據(jù),會導致數(shù)據(jù)被拷貝兩次,第一次是被kafka復制,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。
恰好一次語義(Exactly-once-semantics)
Receiver讀取kafka數(shù)據(jù)是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數(shù)據(jù)保存在WAL中保證數(shù)據(jù)不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數(shù)據(jù)被消費了多次。
? ? ? ? Direct的Exactly-once-semantics(EOS)通過實現(xiàn)kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。
API
代碼演示
spark-streaming-kafka-0-10
說明
spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發(fā)中使用
pom.xml
API:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
創(chuàng)建topic
啟動生產(chǎn)者
代碼演示
好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,并帶大家復習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個“在看”吧~
本文由作者首發(fā) CSDN 博客,原文鏈接:
https://blog.csdn.net/weixin_44318830/article/details/105612516
推薦閱讀
大促下的智能運維挑戰(zhàn):阿里如何抗住“雙11貓晚”?
20萬個法人、百萬條銀行賬戶信息,正在暗網(wǎng)兜售
當莎士比亞遇見Google Flax:教你用字符級語言模型和歸遞神經(jīng)網(wǎng)絡寫“莎士比亞”式句子
Hyperledger Fabric 和企業(yè)級以太坊,誰才是企業(yè)首選?
面試時遇到「看門狗」脖子上掛著「時間輪」,我就問你怕不怕?
同期兩篇 Nature:運行溫度高于 1K 的量子計算平臺問世!
GitHub 標星 10,000+,Apache 頂級項目 ShardingSphere 的開源之路
真香,朕在看了!
總結(jié)
以上是生活随笔為你收集整理的超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 看完这篇还不了解Nginx,那我就哭了!
- 下一篇: 推出云游戏解决方案后,腾讯在这场沙龙上还