大数据技术原理与应用:期末考点总结
個人期末復(fù)習(xí)材料,根據(jù)林子雨的大數(shù)據(jù)技術(shù)教材與其它資料整理。
目錄
- 第一章 大數(shù)據(jù)概述
- 第二章 Hadoop
- 第三章 HDFS
- 第四章 HBase
- 第五章 NoSQL
- 第六章 云數(shù)據(jù)庫
- 第七章 MapReduce
- 第八章 Hadoop 2.x
- 第九章 Spark
- 第十章 流計算
第一章 大數(shù)據(jù)概述
1.大數(shù)據(jù)的4v特征
- 數(shù)據(jù)量大 volume
- 價值密度低 value
- 數(shù)據(jù)類型繁多 variety
- 處理速度快 velocity
2.大數(shù)據(jù)3種思維方式的轉(zhuǎn)變
在思維方式方面,大數(shù)據(jù)完全顛覆了傳統(tǒng)的思維方式:
-
全樣而非抽樣
-
效率而非精確
-
相關(guān)而非因果
3.大數(shù)據(jù)兩大核心技術(shù)
分布式存儲和分布式處理
?
4.大數(shù)據(jù)計算模式及其代表產(chǎn)品
| 批處理計算 | 針對大規(guī)模數(shù)據(jù)的批量處理 | MapReduce、Spark等 |
| 流計算 | 針對流數(shù)據(jù)的實時計算 | Storm、S4、Flume、Streams、Puma、DStream、Super Mario、銀河流數(shù)據(jù)處理平臺等 |
| 圖計算 | 針對大規(guī)模圖結(jié)構(gòu)數(shù)據(jù)的處理 | Pregel、GraphX、Giraph、PowerGraph、Hama、GoldenOrb等 |
| 查詢分析計算 | 大規(guī)模數(shù)據(jù)的存儲管理和查詢分析 | Dremel、Hive、Cassandra、Impala等 |
?
5.大數(shù)據(jù)、云計算與物聯(lián)網(wǎng)之間的區(qū)別和聯(lián)系
第二章 Hadoop
Hadoop面試題 http://www.dajiangtai.com/community/18456.do
1.Hadoop的發(fā)展歷史
2002年,Hadoop起源于Doug Cutting開發(fā)Apache Nutch網(wǎng)絡(luò)搜索引擎項目。
2004年,Nutch項目也模仿GFS開發(fā)了自己的分布式文件系統(tǒng)NDFS(Nutch Distributed File System),也就是HDFS的前身。
2004年,谷歌公司又發(fā)表了另一篇具有深遠(yuǎn)影響的論文《MapReduce:Simplified Data Processing on Large Clusters(Mapreduce:簡化大規(guī)模集群上的數(shù)據(jù)處理)》,闡述了MapReduce分布式編程思想。
2005年,Doug Cutting等人開始嘗試實現(xiàn)MapReduce計算框架,并將它與NDFS(Nutch Distributed File System)結(jié)合,用以支持Nutch引擎的主要算法,Nutch開源實現(xiàn)了谷歌的MapReduce。
2006年2月,由于NDFS和MapReduce在Nutch引擎中有著良好的應(yīng)用,Nutch中的NDFS和MapReduce開始獨立出來,成為Lucene項目的一個子項目,稱為Hadoop,同時,Doug Cutting加盟雅虎。
2008年1月,Hadoop正式成為Apache頂級項目,包含眾多子項目,Hadoop也逐漸開始被雅虎之外的其他公司使用。同年4月,Hadoop打破世界紀(jì)錄,成為最快排序1TB數(shù)據(jù)的系統(tǒng),它采用一個由910個節(jié)點構(gòu)成的集群進(jìn)行運算,排序時間只用了209秒。
在2009年5月,Hadoop更是把1TB數(shù)據(jù)排序時間縮短到62秒。Hadoop從此名聲大震,迅速發(fā)展成為大數(shù)據(jù)時代最具影響力的開源分布式開發(fā)平臺,并成為事實上的大數(shù)據(jù)處理標(biāo)準(zhǔn)。
2.Hadoop的特性
Hadoop以一種可靠、高效、可伸縮的方式進(jìn)行處理的,它具有以下幾個方面的特性:
- 高可靠性:多副本
- 高效性:并行工作
- 高可擴(kuò)展性:方便擴(kuò)展服務(wù)器
- 高容錯性:失敗的任務(wù)會重新分配
- 成本低:廉價的集群設(shè)備
- 運行在Linux平臺上
- 支持多種編程語言
3.Hadoop的版本
Apache Hadoop版本分為兩代,我們將第一代Hadoop稱為Hadoop 1.0,第二代Hadoop稱為Hadoop 2.0。
Hadoop 1.x 和Hadoop 2.x的區(qū)別:在1.x版本中,MapReduce負(fù)責(zé)邏輯運算和資源調(diào)度,耦合性比較大;2.x版本中新增了YARN,負(fù)責(zé)資源調(diào)度,這樣MapReduce就只負(fù)責(zé)運算了。
?
4.Hadoop生態(tài)系統(tǒng)/項目結(jié)構(gòu)
| HDFS | 分布式文件存儲系統(tǒng) |
| MapReduce | 分布式并行計算框架 |
| YARN | 資源調(diào)度管理框架 |
| HBase | 分布式非關(guān)系型數(shù)據(jù)庫 |
| Hive | Hadoop上的數(shù)據(jù)倉庫。提供HQL,將HQL語句轉(zhuǎn)化為MapReduce程序 |
| Zookeeper | 提供分布式協(xié)調(diào)一致性服務(wù) |
| Kafka | 高吞吐量的分布式發(fā)布/訂閱消息系統(tǒng) |
| Pig | 基于Hadoop的大數(shù)據(jù)分析平臺,提供類似sql的查詢語言Pig Latin。 |
| Flume | 日志采集框架 |
| Oozie | Hadoop上的作業(yè)流調(diào)度系統(tǒng) |
| Spark | 分布式并行計算框架 |
| Sqoop | 數(shù)據(jù)傳輸框架,用于MySQL與HDFS之間的數(shù)據(jù)傳遞 |
| Storm | 流計算框架 |
5.配置文件中的參數(shù)
所有配置文件:
重點關(guān)注 hdfs-site.xml,core-site.xml
-
hdfs-site.xml
-
core-site.xml
hadoop.tmp.dir 是 hadoop文件系統(tǒng)依賴的基本配置,很多配置路徑都依賴它,它的默認(rèn)位置是在/tmp/{$user}下面,注意這是個臨時目錄。因此,它的持久化配置很重要的, 如果選擇默認(rèn),一旦因為斷電等外在因素影響,/tmp/{$user}下的所有東西都會丟失。
?
第三章 HDFS
1.分布式文件系統(tǒng)結(jié)構(gòu)
主從結(jié)構(gòu):分布式文件系統(tǒng)在物理上是由諸多計算機(jī)節(jié)點組成的,這里計算機(jī)節(jié)點分為兩類,一類叫主節(jié)點,一類叫從節(jié)點。
2.HDFS的目標(biāo)
- 大數(shù)據(jù)集
- 流式數(shù)據(jù)讀寫
- 簡單的文件模型
- 強(qiáng)大的跨平臺兼容性
- 廉價的硬件設(shè)備
3.HDFS的局限性
-
不適合低延遲數(shù)據(jù)訪問(不適合實時處理,io開銷大)
-
無法高效存儲大量小文件(文件塊機(jī)制)
-
不支持多用戶并發(fā)寫入及任意修改文件(一個文件,同時只允許一個寫入者對文件進(jìn)行追加)
4.塊 Block
塊是HDFS中文件存儲的基本單位,在Hadoop2.x中文件塊大小默認(rèn)為128MB,在1.x中默認(rèn)為64MB。
HDFS采用抽象的塊概念可以帶來以下幾個明顯的好處:
- 支持大規(guī)模文件存儲:文件以塊為單位進(jìn)行存儲,一個大規(guī)模文件可以被分拆成若干個文件塊,不同的文件塊可以被分發(fā)到不同的節(jié)點上,因此,一個文件的大小不會受到單個節(jié)點的存儲容量的限制,可以遠(yuǎn)遠(yuǎn)大于網(wǎng)絡(luò)中任意節(jié)點的存儲容量
- 簡化系統(tǒng)設(shè)計(簡化了文件和元數(shù)據(jù)的管理):首先,大大簡化了存儲管理,因為文件塊大小是固定的,這樣就可以很容易計算出一個節(jié)點可以存儲多少文件塊;其次,方便了元數(shù)據(jù)的管理,元數(shù)據(jù)不需要和文件塊一起存儲,可以由其他系統(tǒng)負(fù)責(zé)管理元數(shù)據(jù)
- 適合數(shù)據(jù)備份:每個文件塊都可以冗余存儲到多個節(jié)點上,大大提高了系統(tǒng)的容錯性和可用性
5.HDFS體系結(jié)構(gòu)
hdfs中采用了主-從結(jié)構(gòu)模型,一個hdfs集群中包含1個namenode和若干個datanode。
- 名稱節(jié)點 namenode
- 數(shù)據(jù)節(jié)點 datanode
- 客戶端 client
6.NameNode 名稱節(jié)點
namenode節(jié)點是整個hdfs集群的唯一的主節(jié)點,負(fù)責(zé):
- 接收和回復(fù)客戶的訪問請求
- 存儲文件系統(tǒng)的所有元數(shù)據(jù)(管理文件系統(tǒng)的命名空間)
名稱節(jié)點(NameNode)負(fù)責(zé)管理分布式文件系統(tǒng)的命名空間(Namespace),保存了兩個核心的數(shù)據(jù)結(jié)構(gòu),即 FsImage 和 EditLog。
-
FsImage
命名空間鏡像文件。FsImage 用于維護(hù)文件系統(tǒng)樹以及文件樹中所有的文件和目錄的元數(shù)據(jù),即包含文件系統(tǒng)中所有目錄和文件inode的序列化形式。
-
EditLog
操作日志文件。EditLog 中記錄了所有針對文件的創(chuàng)建、刪除、重命名等操作。
?
啟動過程(處于安全模式)
在名稱節(jié)點啟動的時候,第一次啟動NameNode格式化后,創(chuàng)建Fsimage和Edits文件。如果不是第一次啟動,直接加載編輯日志和鏡像文件到內(nèi)存。
它會將FsImage文件中的內(nèi)容加載到內(nèi)存中,之后再執(zhí)行EditLog文件中的各項操作,使得內(nèi)存中的元數(shù)據(jù)和實際的同步。
一旦在內(nèi)存中成功建立文件系統(tǒng)元數(shù)據(jù)的映射,則創(chuàng)建一個新的FsImage文件和一個空的EditLog文件。
?
7.DataNode 數(shù)據(jù)節(jié)點
datanode節(jié)點是hdfs集群的工作節(jié)點,負(fù)責(zé):
- 數(shù)據(jù)的存儲:存儲文件系統(tǒng)的數(shù)據(jù)文件,每個文件被分成多個數(shù)據(jù)塊存儲在不同節(jié)點上。
- 數(shù)據(jù)的讀寫:接收客戶端的讀寫請求
- 定期向namenode發(fā)送心跳信息,若沒有發(fā)送則被標(biāo)記為宕機(jī)
- 在namenode的調(diào)度下進(jìn)行對數(shù)據(jù)塊的操作
8.元數(shù)據(jù)
存儲的信息:hdfs中的元數(shù)據(jù)包含HDFS中文件的所有塊和塊的存儲位置、修改和訪問時間、訪問權(quán)限、大小等信息。
存儲的位置:元數(shù)據(jù)存儲在NameNode節(jié)點的FsImage數(shù)據(jù)結(jié)構(gòu)中,由它負(fù)責(zé)管理。
9.HDFS工作機(jī)制(上面都有提到過)
-
NameNode與SecondaryNameNode
(1)NN的啟動過程
(2)采用SecondaryNameNode的原因
(3)SNN的工作機(jī)制
-
DataNode
存儲文件、注冊并接收與回復(fù)client讀寫請求、發(fā)送塊列表、發(fā)送心跳信息
10.通信協(xié)議(了解)
HDFS中有5種通信協(xié)議,各個節(jié)點之間根據(jù)不同協(xié)議通過RPC (Remote Procedure Call) 進(jìn)行交互。
11.HDFS冗余數(shù)據(jù)存儲
HDFS對于同一個數(shù)據(jù)塊會存儲多個副本,默認(rèn)為3個。且不同副本被分布到不同節(jié)點上。
保證:系統(tǒng)的容錯性和可用性
優(yōu)點:加快數(shù)據(jù)傳輸速度、多個副本對比容易檢查數(shù)據(jù)錯誤、保證數(shù)據(jù)可靠性
13.HDFS數(shù)據(jù)存儲策略
假如一個數(shù)據(jù)塊有3個副本,
那么第1個副本會隨機(jī)存儲在一個機(jī)架上的某個節(jié)點;
第2個副本會存儲在與第1個副本相同機(jī)架的不同節(jié)點上;
第3個副本會存儲在與第1個副本不同機(jī)架的隨機(jī)節(jié)點上。
14.HDFS數(shù)據(jù)錯誤的三種類型
- NameNode數(shù)據(jù)錯誤
- DataNode數(shù)據(jù)錯誤
- 數(shù)據(jù)出錯
15.HDFS常用shell命令
# 啟動HDFS [ht@hadoop101 ~]$ start-dfs.sh# 停止HDFS [ht@hadoop101 ~]$ stop-dfs.sh# 輸出某個命令的幫助信息 [ht@hadoop101 ~]$ hadoop fs -help ls# 顯示目錄詳細(xì)信息,-p表示遞歸 [ht@hadoop101 ~]$ hadoop fs -ls [-R]# 在HDFS上創(chuàng)建目錄,-p表示遞歸創(chuàng)建 [ht@hadoop101 ~]$ hadoop fs -mkdir -p /user/ht# 顯示文件內(nèi)容 [ht@hadoop101 myfile]$ hadoop fs -cat /user/ht/test.txt# 將HDFS上的文件拷貝到 HDFS的另一個目錄 # 從/user/ht/test.txt 拷貝到 /user/ht/file/ [ht@hadoop101 myfile]$ hadoop fs -cp /user/ht/test.txt /user/ht/file/# -copyFromLocal:從本地文件系統(tǒng)中拷貝文件到HDFS路徑去 # -copyToLocal:從HDFS拷貝到本地 # -put:等同于copyFromLocal # -get:等同于copyToLocal # -mv:在HDFS目錄中移動文件# -chgrp將文件所屬的用戶組改為ht,-R表示遞歸 # -chmod改變文件權(quán)限、-chown改變文件所屬用戶 也是一樣的 [ht@hadoop101 ~]$ hadoop fs -chgrp -R ht /user/ht/test.txt# 刪除文件或文件夾,-r表示遞歸 [ht@hadoop100 hadoop]$ hdfs dfs -rm [-r] /user/ht/wcoutput # -rmdir:刪除空目錄 # -du 統(tǒng)計目錄的大小信息第四章 HBase
1.起源
HBase是谷歌的BigTable的開源實現(xiàn)。
2.HBase和BigTable的底層技術(shù)對應(yīng)關(guān)系
?
3.HBase與傳統(tǒng)關(guān)系型數(shù)據(jù)庫的對比
區(qū)別主要在于:
-
數(shù)據(jù)類型:hbase中所有數(shù)據(jù)都是字符串類型;關(guān)系型數(shù)據(jù)庫中具有多種數(shù)據(jù)類型。
-
數(shù)據(jù)操作:hbase只能對數(shù)據(jù)進(jìn)行增、刪、查、清空等操作,不能進(jìn)行表之間的連接;關(guān)系型數(shù)據(jù)庫可以增刪改查,還可以通過表的外鍵進(jìn)行連接。
-
存儲模型:hbase基于列存儲,關(guān)系型數(shù)據(jù)庫基于行存儲。
-
數(shù)據(jù)維護(hù):hbase對數(shù)據(jù)進(jìn)行操作后會保留歷史版本。
-
數(shù)據(jù)索引:hbase只有一個索引——行鍵,關(guān)系型數(shù)據(jù)庫可以創(chuàng)建很多索引。
-
可伸縮性:hbase可以通過集群節(jié)點的擴(kuò)展實現(xiàn)存儲數(shù)據(jù)量的水平擴(kuò)展,關(guān)系型數(shù)據(jù)庫難以實現(xiàn)橫向擴(kuò)展,縱向擴(kuò)展的空間有限。
在hbase中:類型是未經(jīng)解釋的字符串,只能對它進(jìn)行增刪查等操作,索引就是它本身的行鍵,它就是按列存儲,對它操作后還會保留歷史版本,hbase還通過集群的機(jī)器增加和減少來實現(xiàn)存儲容量的增大和縮小。
4.HBase的物理視圖與概念視圖
?
5.Master 和 Region的功能
-
Master
master負(fù)責(zé)管理和維護(hù)HBase表的分區(qū)信息(Region列表),維護(hù)Region服務(wù)器列表,分配Region以確保負(fù)載均衡。
-
Region
region負(fù)責(zé)存儲hbase表的數(shù)據(jù),處理來自客戶端的讀寫請求。
6.Region的定位(HBase的三層結(jié)構(gòu))
7.Region服務(wù)器工作原理
-
用戶讀寫數(shù)據(jù)過程
-
緩存刷新
-
StoreFile的合并
8.HLog工作原理
HLog是記錄Region中各項更新操作的日志,它持久化存儲在磁盤中。
用戶更新數(shù)據(jù)必須首先寫入HLog后,才能寫入MemStore緩存。
當(dāng)Region啟動時,首先檢查HLog是否存在未合并的更新操作;若是則先執(zhí)行更新操作,合并到MemStore和StoreFile中,然后生成一個新的空的HLog文件。
9.HBase性能優(yōu)化方法(了解)
-
行鍵
行鍵是按照字典序存儲,因此,設(shè)計行鍵時,要充分利用這個排序特點,將經(jīng)常一起讀取的數(shù)據(jù)存儲到一塊,將最近可能會被訪問的數(shù)據(jù)放在一塊。
舉個例子:如果最近寫入HBase表中的數(shù)據(jù)是最可能被訪問的,可以考慮將時間戳作為行鍵的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作為行鍵,這樣能保證新寫入的數(shù)據(jù)在讀取時可以被快速命中。
-
InMemory
創(chuàng)建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到Region服務(wù)器的緩存中,保證在讀取的時候被cache命中。
-
Max Version
創(chuàng)建表的時候,可以通過HColumnDescriptor.setMaxVersions(int maxVersions)設(shè)置表中數(shù)據(jù)的最大版本,如果只需要保存最新版本的數(shù)據(jù),那么可以設(shè)置setMaxVersions(1)。
-
Time To Live
創(chuàng)建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設(shè)置表中數(shù)據(jù)的存儲生命期,過期數(shù)據(jù)將自動被刪除,例如如果只需要存儲最近兩天的數(shù)據(jù),那么可以設(shè)置setTimeToLive(2 * 24 * 60 * 60)。
10.HBase常用shell命令
# 啟動hbase shell hadoop@ubuntu:~$ hbase shell# 創(chuàng)建表t:列族為f,列族版本號為5 hbase> create 't1',{NAME => 'f1',VERSIONS => 5}# 創(chuàng)建表t:列族為f1、f2、f3,兩種方式等價 hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'} hbase> create 't1', 'f1', 'f2', 'f3'# 創(chuàng)建表t:將表根據(jù)分割算法HexStringSplit 分布在15個Region里 hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}# 創(chuàng)建表t:指定Region的切分點 hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']-------------------------------------------------------------------------------------------------------- # help 查看create命令的幫助信息 hbase(main):002:0> help "create" Creates a table. Pass a table name, and a set of column family # create命令的描述 specifications (at least one), and, optionally, table configuration. Column specification can be a simple string (name), or a dictionary (dictionaries are described below in main help output), necessarily including NAME attribute. Examples:Create a table with namespace=ns1 and table qualifier=t1 #指定namespace與hbase> create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}Create a table with namespace=default and table qualifier=t1hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}hbase> # The above in shorthand would be the following:hbase> create 't1', 'f1', 'f2', 'f3'hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}hbase> create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 1000000, MOB_COMPACT_PARTITION_POLICY => 'weekly'}Table configuration options can be put at the end. Examples:hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }hbase> # Optionally pre-split the table into NUMREGIONS, usinghbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}hbase> create 't1', 'f1', {SPLIT_ENABLED => false, MERGE_ENABLED => false}hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}You can also keep around a reference to the created table:hbase> t1 = create 't1', 'f1'Which gives you a reference to the table named 't1', on which you can then call methods. -------------------------------------------------------------------------------------------------------# list 列出所有表 hbase> list# put 向表中指定的單元格添加數(shù)據(jù) hbase> put 't1','row1','f1:c1',120000 # 通過表,行鍵,列族:列限定符進(jìn)行定位,值為120000# get 通過指定坐標(biāo)來獲取單元格的值 hbase(main):005:0> get 't1','row1','f1:c1' COLUMN CELL f1:c1 timestamp=1609810077099, value=120000 1 row(s) Took 0.0722 seconds # delete 刪除表中指定單元格的數(shù)據(jù) hbase(main):021:0> delete 't1','row1','f1:c1',timestamp=1609810077099# scan 瀏覽表的信息 hbase(main):004:0> scan 't1' # 這時會顯示表t1中的所有行# scan 瀏覽某個單元格的數(shù)據(jù) hbase(main):010:0> scan 't1',{COLUMNS => 'f1:c1'}# alter 修改列族模式 hbase(main):011:0> alter 't1',NAME => 'f2' # 向表t1中增加列族f2 hbase(main):014:0> alter 't1',NAME => 'f2',METHOD => 'delete' # 將表t1中的列族f2刪除# count 統(tǒng)計表中的行數(shù) hbase(main):015:0> count 't1' # 統(tǒng)計t1的行數(shù)# describe 顯示表的相關(guān)信息 hbase(main):017:0> describe 't1' Table t1 is ENABLED t1 COLUMN FAMILIES DESCRIPTION {NAME => 'f1', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETE D_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSI ONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'fal se', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE = > 'true', BLOCKSIZE => '65536'} 1 row(s) QUOTAS 0 row(s) Took 0.1104 seconds# enable/disable 使表有效或無效 hbase(main):015:0> disable 't1'# drop 刪除表,這里需要注意刪除表之前要先使用disable使這個表無效,這也是為了防止誤刪 hbase(main):023:0> disable 't1' Took 0.8378 seconds hbase(main):024:0> drop 't1' Took 0.4997 seconds# exists 判斷某個表是否存在 hbase(main):025:0> exists 't1' Table t1 does not exist Took 0.0231 seconds => false# truncate 使表無效并清空該表的數(shù)據(jù) hbase(main):029:0> truncate 'teacher' Truncating 'teacher' table (it may take a while): Disabling table... Truncating table... Took 2.1127 secondshbase(main):031:0> exists 'teacher' # truncate后查看該表是否存在 Table teacher does exist Took 0.0156 seconds => true # 還存在# 查看HBase集群狀態(tài) hbase(main):026:0> status 1 active master, 0 backup masters, 1 servers, 0 dead, 5.0000 average load Took 0.0582 seconds # 退出hbase shell hbase> exit第五章 NoSQL
1.nosql 的含義
2.nosql 興起的原因
- 關(guān)系數(shù)據(jù)庫已經(jīng)無法滿足Web2.0的需求
(1)無法滿足海量數(shù)據(jù)的管理需求
(2)無法滿足數(shù)據(jù)高并發(fā)的需求
(3)無法滿足高可擴(kuò)展性和高可用性的需求
- 關(guān)系數(shù)據(jù)庫的關(guān)鍵特性包括完善的事務(wù)機(jī)制和高效的查詢機(jī)制,到了Web2.0時代卻成了雞肋
(1)Web2.0網(wǎng)站系統(tǒng)通常不要求嚴(yán)格的數(shù)據(jù)庫事務(wù)
(2)Web2.0并不要求嚴(yán)格的讀寫實時性
(3)Web2.0通常不包含大量復(fù)雜的SQL查詢(去結(jié)構(gòu)化,存儲空間換取更好的查詢性能)
3.nosql與關(guān)系型數(shù)據(jù)庫的比較
| 數(shù)據(jù)庫原理 | 完全支持 | 部分支持 | RDBMS有關(guān)系代數(shù)理論作為基礎(chǔ);NoSQL沒有統(tǒng)一的理論基礎(chǔ) |
| 一致性 | 強(qiáng)一致性 | 弱一致性 | RDBMS嚴(yán)格遵守事務(wù)ACID模型,可以保證事務(wù)強(qiáng)一致性;很多NoSQL數(shù)據(jù)庫放松了對事務(wù)ACID四性的要求,而是遵守BASE模型,只能保證最終一致性 |
| 數(shù)據(jù)庫模式 | 固定 | 靈活 | RDBMS需要定義數(shù)據(jù)庫模式,嚴(yán)格遵守數(shù)據(jù)定義和相關(guān)約束條件;NoSQL不存在數(shù)據(jù)庫模式,可以自由靈活定義并存儲各種不同類型的數(shù)據(jù) |
| 數(shù)據(jù)完整性 | 容易實現(xiàn) | 很難實現(xiàn) | 任何一個RDBMS都可以很容易實現(xiàn)數(shù)據(jù)完整性,比如通過主鍵或者非空約束來實現(xiàn)實體完整性,通過主鍵、外鍵來實現(xiàn)參照完整性,通過約束或者觸發(fā)器來實現(xiàn)用戶自定義完整性;但是,在NoSQL數(shù)據(jù)庫卻無法實現(xiàn) |
| 數(shù)據(jù)規(guī)模 | 大 | 超大 | RDBMS很難實現(xiàn)橫向擴(kuò)展,縱向擴(kuò)展的空間也比較有限,性能會隨著數(shù)據(jù)規(guī)模的增大而降低;NoSQL可以很容易通過添加更多設(shè)備來支持更大規(guī)模的數(shù)據(jù) |
| 擴(kuò)展性 | 一般 | 好 | RDBMS很難實現(xiàn)橫向擴(kuò)展,縱向擴(kuò)展的空間也比較有限;NoSQL在設(shè)計之初就充分考慮了橫向擴(kuò)展的需求,可以很容易通過添加廉價設(shè)備實現(xiàn)擴(kuò)展 |
| 可用性 | 好 | 很好 | RDBMS在任何時候都以保證數(shù)據(jù)一致性為優(yōu)先目標(biāo),其次才是優(yōu)化系統(tǒng)性能,隨著數(shù)據(jù)規(guī)模的增大,RDBMS為了保證嚴(yán)格的一致性,只能提供相對較弱的可用性;大多數(shù)NoSQL都能提供較高的可用性 |
| 查詢效率 | 快 | 可以實現(xiàn)高效的簡單查詢,但是不具備高度結(jié)構(gòu)化查詢等特性,復(fù)雜查詢的性能不盡人意 | RDBMS借助于索引機(jī)制可以實現(xiàn)快速查詢(包括記錄查詢和范圍查詢);很多NoSQL數(shù)據(jù)庫沒有面向復(fù)雜查詢的索引,雖然NoSQL可以使用MapReduce來加速查詢,但是,在復(fù)雜查詢方面的性能仍然不如RDBMS |
| 標(biāo)準(zhǔn)化 | 是 | 否 | RDBMS已經(jīng)標(biāo)準(zhǔn)化(SQL);NoSQL還沒有行業(yè)標(biāo)準(zhǔn),不同的NoSQL數(shù)據(jù)庫都有自己的查詢語言,很難規(guī)范應(yīng)用程序接口 StoneBraker認(rèn)為:NoSQL缺乏統(tǒng)一查詢語言,將會拖慢NoSQL發(fā)展 |
| 技術(shù)支持 | 高 | 低 | RDBMS經(jīng)過幾十年的發(fā)展,已經(jīng)非常成熟,Oracle等大型廠商都可以提供很好的技術(shù)支持;NoSQL在技術(shù)支持方面仍然處于起步階段,還不成熟,缺乏有力的技術(shù)支持。 |
| 可維護(hù)性 | 復(fù)雜 | 復(fù)雜 | RDBMS需要專門的數(shù)據(jù)庫管理員(DBA)維護(hù);NoSQL數(shù)據(jù)庫雖然沒有DBMS復(fù)雜,也難以維護(hù)。 |
總結(jié)
(1)關(guān)系數(shù)據(jù)庫
優(yōu)勢:以完善的關(guān)系代數(shù)理論作為基礎(chǔ),有嚴(yán)格的標(biāo)準(zhǔn),支持事務(wù)ACID四性,借助索引機(jī)制可以實現(xiàn)高效的查詢,技術(shù)成熟,有專業(yè)公司的技術(shù)支持
劣勢:可擴(kuò)展性較差,無法較好支持海量數(shù)據(jù)存儲,數(shù)據(jù)模型過于死板、無法較好支持Web2.0應(yīng)用,事務(wù)機(jī)制影響了系統(tǒng)的整體性能等
(2)NoSQL數(shù)據(jù)庫
優(yōu)勢:可以支持超大規(guī)模數(shù)據(jù)存儲,靈活的數(shù)據(jù)模型可以很好地支持Web2.0應(yīng)用,具有強(qiáng)大的橫向擴(kuò)展能力等
劣勢:缺乏數(shù)學(xué)理論基礎(chǔ),復(fù)雜查詢性能不高,大都不能實現(xiàn)事務(wù)強(qiáng)一致性,很難實現(xiàn)數(shù)據(jù)完整性,技術(shù)尚不成熟,缺乏專業(yè)團(tuán)隊的技術(shù)支持,維護(hù)較困難等
4.nosql的4大類型、各自的典型應(yīng)用
典型的NoSQL數(shù)據(jù)庫通常包括鍵值數(shù)據(jù)庫、列族數(shù)據(jù)庫、文檔數(shù)據(jù)庫和圖形數(shù)據(jù)庫。
各類型的產(chǎn)品:
- 鍵值數(shù)據(jù)庫
| 數(shù)據(jù)模型 | 鍵/值對 鍵是一個字符串對象 值可以是任意類型的數(shù)據(jù),比如整型、字符型、數(shù)組、列表、集合等 |
| 典型應(yīng)用 | 涉及頻繁讀寫、擁有簡單數(shù)據(jù)模型的應(yīng)用 內(nèi)容緩存,比如會話、配置文件、參數(shù)、購物車等 存儲配置和用戶數(shù)據(jù)信息的移動應(yīng)用 |
| 優(yōu)點 | 擴(kuò)展性好,靈活性好,大量寫操作時性能高 |
| 缺點 | 無法存儲結(jié)構(gòu)化信息,條件查詢效率較低 |
| 不適用情形 | 不是通過鍵而是通過值來查:鍵值數(shù)據(jù)庫根本沒有通過值查詢的途徑 需要存儲數(shù)據(jù)之間的關(guān)系:在鍵值數(shù)據(jù)庫中,不能通過兩個或兩個以上的鍵來關(guān)聯(lián)數(shù)據(jù) 需要事務(wù)的支持:在一些鍵值數(shù)據(jù)庫中,產(chǎn)生故障時,不可以回滾 |
| 使用者 | 百度云數(shù)據(jù)庫(Redis)、GitHub(Riak)、BestBuy(Riak)、Twitter(Redis和Memcached)、StackOverFlow(Redis)、Instagram (Redis)、Youtube(Memcached)、Wikipedia(Memcached) |
-
列族數(shù)據(jù)庫
相關(guān)產(chǎn)品BigTable、HBase、Cassandra、HadoopDB、GreenPlum、PNUTS 數(shù)據(jù)模型 列族 典型應(yīng)用 分布式數(shù)據(jù)存儲與管理 數(shù)據(jù)在地理上分布于多個數(shù)據(jù)中心的應(yīng)用程序 可以容忍副本中存在短期不一致情況的應(yīng)用程序 擁有動態(tài)字段的應(yīng)用程序 擁有潛在大量數(shù)據(jù)的應(yīng)用程序,大到幾百TB的數(shù)據(jù) 優(yōu)點 查找速度快,可擴(kuò)展性強(qiáng),容易進(jìn)行分布式擴(kuò)展,復(fù)雜性低 缺點 功能較少,大都不支持強(qiáng)事務(wù)一致性 不適用情形 需要ACID事務(wù)支持的情形,Cassandra等產(chǎn)品就不適用 使用者 Ebay(Cassandra)、Instagram(Cassandra)、NASA(Cassandra)、Twitter(Cassandra and HBase)、Facebook(HBase)、Yahoo!(HBase) -
文檔數(shù)據(jù)庫
相關(guān)產(chǎn)品MongoDB、CouchDB、Terrastore、ThruDB、RavenDB、SisoDB、RaptorDB、CloudKit、Perservere、Jackrabbit 數(shù)據(jù)模型 鍵/值 值(value)是版本化的文檔 典型應(yīng)用 存儲、索引并管理面向文檔的數(shù)據(jù)或者類似的半結(jié)構(gòu)化數(shù)據(jù) 比如,用于后臺具有大量讀寫操作的網(wǎng)站、使用JSON數(shù)據(jù)結(jié)構(gòu)的應(yīng)用、使用嵌套結(jié)構(gòu)等非規(guī)范化數(shù)據(jù)的應(yīng)用程序 優(yōu)點 性能好(高并發(fā)),靈活性高,復(fù)雜性低,數(shù)據(jù)結(jié)構(gòu)靈活 提供嵌入式文檔功能,將經(jīng)常查詢的數(shù)據(jù)存儲在同一個文檔中 既可以根據(jù)鍵來構(gòu)建索引,也可以根據(jù)內(nèi)容構(gòu)建索引 缺點 缺乏統(tǒng)一的查詢語法 不適用情形 在不同的文檔上添加事務(wù)。文檔數(shù)據(jù)庫并不支持文檔間的事務(wù),如果對這方面有需求則不應(yīng)該選用這個解決方案 使用者 百度云數(shù)據(jù)庫(MongoDB)、SAP (MongoDB)、Codecademy (MongoDB)、Foursquare (MongoDB)、NBC News (RavenDB) -
圖形數(shù)據(jù)庫
相關(guān)產(chǎn)品Neo4J、OrientDB、InfoGrid、Infinite Graph、GraphDB 數(shù)據(jù)模型 圖結(jié)構(gòu) 典型應(yīng)用 專門用于處理具有高度相互關(guān)聯(lián)關(guān)系的數(shù)據(jù),比較適合于社交網(wǎng)絡(luò)、模式識別、依賴分析、推薦系統(tǒng)以及路徑尋找等問題 優(yōu)點 靈活性高,支持復(fù)雜的圖形算法,可用于構(gòu)建復(fù)雜的關(guān)系圖譜 缺點 復(fù)雜性高,只能支持一定的數(shù)據(jù)規(guī)模 使用者 Adobe(Neo4J)、Cisco(Neo4J)、T-Mobile(Neo4J)
5.nosql 的三大基石
-
CAP
所謂的CAP指的是:
C(Consistency):一致性,是指任何一個讀操作總是能夠讀到之前完成的寫操作的結(jié)果,也就是在分布式環(huán)境中,多點的數(shù)據(jù)是一致的,或者說,所有節(jié)點在同一時間具有相同的數(shù)據(jù)
A:(Availability):可用性,是指快速獲取數(shù)據(jù),可以在確定的時間內(nèi)返回操作結(jié)果,保證每個請求不管成功或者失敗都有響應(yīng);
P(Tolerance of Network Partition):分區(qū)容忍性,是指當(dāng)出現(xiàn)網(wǎng)絡(luò)分區(qū)的情況時(即系統(tǒng)中的一部分節(jié)點無法和其他節(jié)點進(jìn)行通信),分離的系統(tǒng)也能夠正常運行,也就是說,系統(tǒng)中任意信息的丟失或失敗不會影響系統(tǒng)的繼續(xù)運作。
CAP理論告訴我們,一個分布式系統(tǒng)不可能同時滿足一致性、可用性和分區(qū)容忍性這三個需求,最多只能同時滿足其中兩個,正所謂“魚和熊掌不可兼得”。
-
BASE
說起B(yǎng)ASE(Basically Availble, Soft-state, Eventual consistency),不得不談到ACID。
一個數(shù)據(jù)庫事務(wù)具有ACID四性:
A(Atomicity):原子性,是指事務(wù)必須是原子工作單元,對于其數(shù)據(jù)修改,要么全都執(zhí)行,要么全都不執(zhí)行
C(Consistency):一致性,是指事務(wù)在完成時,必須使所有的數(shù)據(jù)都保持一致狀態(tài)
I(Isolation):隔離性,是指由并發(fā)事務(wù)所做的修改必須與任何其它并發(fā)事務(wù)所做的修改隔離
D(Durability):持久性,是指事務(wù)完成之后,它對于系統(tǒng)的影響是永久性的,該修改即使出現(xiàn)致命的系統(tǒng)故障也將一直保持BASE的基本含義是基本可用(Basically Availble)、軟狀態(tài)(Soft-state)和最終一致性(Eventual consistency):
1.基本可用:基本可用,是指一個分布式系統(tǒng)的一部分發(fā)生問題變得不可用時,其他部分仍然可以正常使用,也就是允許分區(qū)失敗的情形出現(xiàn)。
2.軟狀態(tài):“軟狀態(tài)(soft-state)”是與“硬狀態(tài)(hard-state)”相對應(yīng)的一種提法。數(shù)據(jù)庫保存的數(shù)據(jù)是“硬狀態(tài)”時,可以保證數(shù)據(jù)一致性,即保證數(shù)據(jù)一直是正確的。“軟狀態(tài)”是指狀態(tài)可以有一段時間不同步,具有一定的滯后性。
3.最終一致性:一致性的類型包括強(qiáng)一致性和弱一致性,二者的主要**區(qū)別在于高并發(fā)的數(shù)據(jù)訪問操作下,后續(xù)操作是否能夠獲取最新的數(shù)據(jù)。**對于強(qiáng)一致性而言,當(dāng)執(zhí)行完一次更新操作后,后續(xù)的其他讀操作就可以保證讀到更新后的最新數(shù)據(jù);反之,如果不能保證后續(xù)訪問讀到的都是更新后的最新數(shù)據(jù),那么就是弱一致性。而最終一致性只不過是弱一致性的一種特例,允許后續(xù)的訪問操作可以暫時讀不到更新后的數(shù)據(jù),但是經(jīng)過一段時間之后,必須最終讀到更新后的數(shù)據(jù)。
最常見的實現(xiàn)最終一致性的系統(tǒng)是DNS(域名系統(tǒng))。一個域名更新操作根據(jù)配置的形式被分發(fā)出去,并結(jié)合有過期機(jī)制的緩存;最終所有的客戶端可以看到最新的值。 -
最終一致性
最終一致性根據(jù)更新數(shù)據(jù)后各進(jìn)程訪問到數(shù)據(jù)的時間和方式的不同,又可以區(qū)分為:
因果一致性:如果進(jìn)程A通知進(jìn)程B它已更新了一個數(shù)據(jù)項,那么進(jìn)程B的后續(xù)訪問將獲得A寫入的最新值。而與進(jìn)程A無因果關(guān)系的進(jìn)程C的訪問,仍然遵守一般的最終一致性規(guī)則
“讀己之所寫”一致性:可以視為因果一致性的一個特例。當(dāng)進(jìn)程A自己執(zhí)行一個更新操作之后,它自己總是可以訪問到更新過的值,絕不會看到舊值
單調(diào)讀一致性:如果進(jìn)程已經(jīng)看到過數(shù)據(jù)對象的某個值,那么任何后續(xù)訪問都不會返回在那個值之前的值會話一致性:它把訪問存儲系統(tǒng)的進(jìn)程放到會話(session)的上下文中,只要會話還存在,系統(tǒng)就保證“讀己之所寫”一致性。如果由于某些失敗情形令會話終止,就要建立新的會話,而且系統(tǒng)保證不會延續(xù)到新的會話
單調(diào)寫一致性:系統(tǒng)保證來自同一個進(jìn)程的寫操作順序執(zhí)行。系統(tǒng)必須保證這種程度的一致性,否則就非常難以編程了
擴(kuò)展知識
當(dāng)處理CAP的問題時,可以有幾個明顯的選擇:
1.CA:也就是強(qiáng)調(diào)一致性(C)和可用性(A),放棄分區(qū)容忍性(P),最簡單的做法是把所有與事務(wù)相關(guān)的內(nèi)容都放到同一臺機(jī)器上。很顯然,這種做法會嚴(yán)重影響系統(tǒng)的可擴(kuò)展性。傳統(tǒng)的關(guān)系數(shù)據(jù)庫(MySQL、SQL Server和PostgreSQL),都采用了這種設(shè)計原則,因此,擴(kuò)展性都比較差
2.CP:也就是強(qiáng)調(diào)一致性(C)和分區(qū)容忍性(P),放棄可用性(A),當(dāng)出現(xiàn)網(wǎng)絡(luò)分區(qū)的情況時,受影響的服務(wù)需要等待數(shù)據(jù)一致,因此在等待期間就無法對外提供服務(wù)
3.AP:也就是強(qiáng)調(diào)可用性(A)和分區(qū)容忍性(P),放棄一致性(C),允許系統(tǒng)返回不一致的數(shù)據(jù)
6.MongoDB基本概念
在mongodb中基本的概念是文檔、集合、數(shù)據(jù)庫
| database | database | 數(shù)據(jù)庫 |
| table | collection | 數(shù)據(jù)庫表/集合 |
| row | document | 數(shù)據(jù)記錄行/文檔 |
| column | field | 數(shù)據(jù)字段/域 |
| index | index | 索引 |
| table joins | 表連接,MongoDB不支持 | |
| primary key | primary key | 主鍵,MongoDB自動將_id字段設(shè)置為主鍵 |
第六章 云數(shù)據(jù)庫
1.云數(shù)據(jù)庫的概念
云數(shù)據(jù)庫是部署和虛擬化在云計算環(huán)境中的數(shù)據(jù)庫。
云數(shù)據(jù)庫是在云計算的大背景下發(fā)展起來的一種新興的共享基礎(chǔ)架構(gòu)的方法,它極大地增強(qiáng)了數(shù)據(jù)庫的存儲能力,消除了人員、硬件、軟件的重復(fù)配置,讓軟、硬件升級變得更加容易。
云數(shù)據(jù)庫具有高可擴(kuò)展性、高可用性、采用多租形式和支持資源有效分發(fā)等特點。
2.云數(shù)據(jù)庫的特性
(1)動態(tài)可擴(kuò)展:用戶按需擴(kuò)展
(2)高可用性:云數(shù)據(jù)庫具有故障自動單點切換、數(shù)據(jù)庫自動備份等功能
(3)較低的使用代價:RDS支付的費用遠(yuǎn)低于自建數(shù)據(jù)庫所需的成本
(4)易用性:提供WEB界面進(jìn)行配置、操作數(shù)據(jù)庫實例
(5)高性能
(6)免維護(hù):有專門的維護(hù)人員
(7)安全
3.云數(shù)據(jù)庫廠商以及各自的產(chǎn)品
| Amazon | Dynamo、SimpleDB、RDS |
| Google Cloud SQL | |
| Microsoft | Microsoft SQL Azure |
| Oracle | Oracle Cloud |
| Yahoo! | PNUTS |
| Vertica | Analytic Database v3.0 for the Cloud |
| EnerpriseDB | Postgres Plus in the Cloud |
| 阿里 | 阿里云RDS |
| 百度 | 百度云數(shù)據(jù)庫 |
| 騰訊 | 騰訊云數(shù)據(jù)庫 |
第七章 MapReduce
1.MapReduce與傳統(tǒng)并行計算框架比較
| 集群架構(gòu)/容錯性 | 共享式(共享內(nèi)存/共享存儲),容錯性差 | 非共享式,容錯性好 |
| 硬件/價格/擴(kuò)展性 | 刀片服務(wù)器、高速網(wǎng)、SAN,價格貴,擴(kuò)展性差 | 普通PC機(jī),便宜,擴(kuò)展性好 |
| 編程/學(xué)習(xí)難度 | what-how,難 | what,簡單 |
| 適用場景 | 實時、細(xì)粒度計算、計算密集型 | 非實時、批處理、數(shù)據(jù)密集型 |
2.MapReduce的2個特點
分而治之、計算向數(shù)據(jù)靠攏
3.MapReduce流程
4.MapReduce的體系結(jié)構(gòu)
下面是Hadoop1.x中的體系結(jié)構(gòu),但我覺得不會考:
MapReduce體系結(jié)構(gòu)主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task。
1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端
用戶可通過Client提供的一些接口查看作業(yè)運行狀態(tài)
2)JobTracker
JobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度
JobTracker 監(jiān)控所有TaskTracker與Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點
JobTracker 會跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會在資源出現(xiàn)空閑時,選擇合適的任務(wù)去使用這些資源
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節(jié)點上資源的使用情況和任務(wù)的運行進(jìn)度匯報給JobTracker,同時接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動新任務(wù)、殺死任務(wù)等)
TaskTracker 使用“slot”等量劃分本節(jié)點上的資源量(CPU、內(nèi)存等)。一個Task 獲取到一個slot 后才有機(jī)會運行,而Hadoop調(diào)度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
5.map與reduce并行度的決定因素
maptask并行度由輸入數(shù)據(jù)分片數(shù)量決定;reducetask并行度由輸入數(shù)據(jù)分區(qū)數(shù)量決定。
6.WordCount代碼
package com.ht.wordcount;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCount {public static class WordCountMapper extends Mapper<LongWritable,Text,Text, IntWritable>{IntWritable intWritable = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1.讀取數(shù)據(jù)String line = value.toString();// 2.切片String[] splits = line.split("\t");// 3.輸出Text text = new Text();for (String split : splits) {text.set(split);context.write(text, intWritable);}}}public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 1.讀取數(shù)據(jù) <k,list<v1,v2,...,vn>>int sumVal = 0;for (IntWritable val:values){sumVal += val.get();}// 2.輸出數(shù)據(jù)context.write(key,new IntWritable(sumVal));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1.hadoop運行信息Configuration configuration = new Configuration();// 2.獲取hadoop實例String jobName = "WordCount";Job job = Job.getInstance(configuration, jobName);// 3.設(shè)置程序的本地jar包job.setJarByClass(WordCount.class);// 4.關(guān)聯(lián)mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 5.設(shè)置mapper的輸出kvjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 6.設(shè)置reducer的輸出kv(最終輸出)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 7.設(shè)置作業(yè)輸入輸出路徑Path inputPath = new Path("D:\\Document\\temp\\wordcount\\input.txt");Path outputPath = new Path("D:\\Document\\temp\\wordcount\\output");// 獲取hdfs文件系統(tǒng)實例FileSystem fileSystem = FileSystem.get(configuration);if(fileSystem.exists(outputPath)){fileSystem.delete(outputPath,true);}// 8.設(shè)置輸入輸出格式FileInputFormat.addInputPath(job,inputPath);FileOutputFormat.setOutputPath(job, outputPath);// 9.查看作業(yè)運行情況System.out.println("job " + jobName + "is running...");// 若成功打印1,不成功打印0System.out.println(job.waitForCompletion(true) ? 1:0);} }第八章 Hadoop 2.x
1.Hadoop1.0的不足、改進(jìn)(了解)
Hadoop1.0的核心組件(僅指MapReduce和HDFS,不包括Hadoop生態(tài)系統(tǒng)內(nèi)的Pig、Hive、HBase等其他組件),主要存在以下不足:
- 抽象層次低,需人工編碼
- 表達(dá)能力有限
- 開發(fā)者自己管理作業(yè)(Job)之間的依賴關(guān)系
- 難以看到程序整體邏輯
- 執(zhí)行迭代操作效率低
- 資源浪費(Map和Reduce分兩階段執(zhí)行)
- 實時性差(適合批處理,不支持實時交互式)
| HDFS | 單一名稱節(jié)點,存在單點故障問題 | 設(shè)計了HDFS HA,提供名稱節(jié)點熱備機(jī)制 |
| HDFS | 單一命名空間,無法實現(xiàn)資源隔離 | 設(shè)計了HDFS Federation,管理多個命名空間 |
| MapReduce | 資源管理效率低 | 設(shè)計了新的資源管理框架YARN |
2.HA的工作原理
HDFS HA(High Availability)是為了解決單點故障問題。HA集群設(shè)置兩個名稱節(jié)點,“活躍(Active)”和“待命(Standby)”,兩種名稱節(jié)點的狀態(tài)同步,可以借助于一個共享存儲系統(tǒng)來實現(xiàn)。
一旦活躍名稱節(jié)點出現(xiàn)故障,就可以立即切換到待命名稱節(jié)點,Zookeeper確保一個名稱節(jié)點在對外服務(wù)。名稱節(jié)點維護(hù)映射信息,數(shù)據(jù)節(jié)點同時向兩個名稱節(jié)點匯報信息。
?
3.YARN設(shè)計思路
到了Hadoop2.0以后,MapReduce1.0中的資源管理調(diào)度功能,被單獨分離出來形成了YARN,它是一個純粹的資源管理調(diào)度框架,而不是一個計算框架。
?
4.YARN的發(fā)展目標(biāo)
一個企業(yè)當(dāng)中同時存在各種不同的業(yè)務(wù)應(yīng)用場景,需要采用不同的計算框架
MapReduce實現(xiàn)離線批處理
使用Impala實現(xiàn)實時交互式查詢分析
使用Storm實現(xiàn)流式數(shù)據(jù)實時分析
使用Spark實現(xiàn)迭代計算
這些產(chǎn)品通常來自不同的開發(fā)團(tuán)隊,具有各自的資源調(diào)度管理機(jī)制,為了避免不同類型應(yīng)用之間互相干擾,企業(yè)就需要把內(nèi)部的服務(wù)器拆分成多個集群,分別安裝運行不同的計算框架,即“一個框架一個集群”
導(dǎo)致問題:集群資源利用率低、數(shù)據(jù)無法共享、維護(hù)代價高
YARN的目標(biāo)就是實現(xiàn)“一個集群多個框架”,即在一個集群上部署一個統(tǒng)一的資源調(diào)度管理框架YARN,在YARN之上可以部署其他各種計算框架。
由YARN為這些計算框架提供統(tǒng)一的資源調(diào)度管理服務(wù),并且能夠根據(jù)各種計算框架的負(fù)載需求,調(diào)整各自占用的資源,實現(xiàn)集群資源共享和資源彈性收縮。
可以實現(xiàn)一個集群上的不同應(yīng)用負(fù)載混搭,有效提高了集群的利用率;不同計算框架可以共享底層存儲,避免了數(shù)據(jù)集跨集群移動。
?
第九章 Spark
1.Spark的特點
- 運行速度快(相較于Hadoop)
- 通用性(具有完整的技術(shù)棧)
- 易用性(多種方式使用)
- 運行模式多樣
2.Spark支持的語言
scala、java、python、r
3.scala的特點
- 函數(shù)式編程,具備強(qiáng)大的并發(fā)性,更好地支持分布式系統(tǒng)
- 兼容java
- 語法簡潔優(yōu)雅
- 支持高效的交互式編程
- 面向?qū)ο?/li>
- scala是spark的開發(fā)語言
4.Spark與Hadoop的比較
| 表達(dá)能力有限 | Spark的計算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種數(shù)據(jù)集操作類型,編程模型比Hadoop MapReduce更靈活 |
| 磁盤I/O開銷大 | Spark提供了內(nèi)存計算,可將中間結(jié)果放到內(nèi)存中,對于迭代運算效率更高 |
| 延遲高 | Spark基于DAG的任務(wù)調(diào)度執(zhí)行機(jī)制,要優(yōu)于Hadoop MapReduce的迭代執(zhí)行機(jī)制 |
5.Spark設(shè)計理念
一個技術(shù)棧滿足不同應(yīng)用場景。
6.Spark的組件、組件的應(yīng)用場景、時間跨度
| 復(fù)雜的批量數(shù)據(jù)處理 | 小時級 | MapReduce、Hive | Spark Core |
| 基于歷史數(shù)據(jù)的交互式查詢 | 分鐘級、秒級 | Impala、Dremel、Drill | Spark SQL |
| 基于實時數(shù)據(jù)流的數(shù)據(jù)處理 | 毫秒、秒級 | Storm、S4 | Spark Streaming、Structured Streaming |
| 基于歷史數(shù)據(jù)的數(shù)據(jù)挖掘 | - | Mahout | MLlib |
| 圖結(jié)構(gòu)數(shù)據(jù)的處理 | - | Pregel、Hama | GraphX |
7.RDD基本概念
RDD是彈性分布式數(shù)據(jù)集,一種基于內(nèi)存的數(shù)據(jù)共享模型。
8.Spark程序的運行流程
用戶提交代碼生成一個Job — sparkcontext向集群資源管理器注冊并申請資源 — 集群資源管理器分配Executor資源給這個Job — Executor向sparkcontext申請任務(wù) — sparkcontext分發(fā)任務(wù) — Executor執(zhí)行完成,返回給sparkcontext
?
9.RDD的兩種算子
transformation 轉(zhuǎn)換算子、action行動算子
10.血緣關(guān)系
多個RDD之間一系列的依賴關(guān)系稱為血緣關(guān)系。
11.RDD的特性
1.A list of partitions 可分區(qū)
RDD是一個由多個partition(某個節(jié)點里的某一片連續(xù)的數(shù)據(jù))組成的的list;將數(shù)據(jù)加載為RDD時,一般會遵循數(shù)據(jù)的本地性(一般一個hdfs里的block會加載為一個partition)。
2.A function for computing each split 分區(qū)計算
一個函數(shù)計算每一個分片,RDD的每個partition上面都會有function,也就是函數(shù)應(yīng)用,其作用是實現(xiàn)RDD之間partition的轉(zhuǎn)換。
3.A list of dependencies on other RDDs 依賴關(guān)系
RDD會記錄它的依賴 ,依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD都有依賴。為了容錯(重算,cache,checkpoint),也就是說在內(nèi)存中的RDD操作時出錯或丟失會進(jìn)行重算。
4.Optionally,a Partitioner for Key-value RDDs 自定義分區(qū)
可選項,如果RDD里面存的數(shù)據(jù)是key-value形式,則可以傳遞一個自定義的Partitioner進(jìn)行重新分區(qū),例如這里自定義的Partitioner是基于key進(jìn)行分區(qū),那則會將不同RDD里面的相同key的數(shù)據(jù)放到同一個partition里面
5.Optionally, a list of preferred locations to compute each split on 數(shù)據(jù)的本地性
最優(yōu)的位置去計算,也就是數(shù)據(jù)的本地性。
12.RDD的依賴關(guān)系
兩個相鄰RDD之間的關(guān)系。有兩種,分為“窄依賴”和“寬依賴”。經(jīng)過Shuffle過程的稱為寬依賴。
13.stage的劃分
如果有shuffle過程即寬依賴,那么就會創(chuàng)建一個新的stage。
14.Spark的三種部署方式
spark獨立部署、On YARN、On Meros
15.Spark編程
-
SparkContext:程序運行的上下文環(huán)境
-
SparkSession:用于創(chuàng)建會話,其實是封裝了 SQLContext 和 HiveContext
-
sparksql提供了DataFrame\DataSet,Spark SQL執(zhí)行計劃生成和優(yōu)化都由Catalyst(函數(shù)式關(guān)系查詢優(yōu)化框架)負(fù)責(zé)
-
df與rdd的區(qū)別:
RDD是分布式的 Java對象的集合,但是,對象內(nèi)部結(jié)構(gòu)對于RDD而言卻是不可知的;
DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,提供了詳細(xì)的結(jié)構(gòu)信息。 -
df的創(chuàng)建、隱式轉(zhuǎn)換
DataFrame可以從文件中讀取并創(chuàng)建、還可以由RDD轉(zhuǎn)換得到。SparkSession.implicits $是Scala中的隱式方法,用于將常見的Scala對象轉(zhuǎn)換為DataFrames。RDD對象可以通過隱式轉(zhuǎn)換轉(zhuǎn)為DataFrame。
-
rdd轉(zhuǎn)換為df的2種方式
利用反射機(jī)制推斷RDD模式、利用編程方式定義RDD模式
-
WordCount
1.RDD
package Com.HT.Finalimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[2]").setAppName("spark")val sparkContext = new SparkContext(sparkConf)// 步驟:讀取文件,分割,map,reduceByKeyval rdd = sparkContext.textFile("D:\\Document\\temp\\wordcount\\input.txt") // 讀取文件// 方法1:不簡化//val rdd1 = rdd.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey((a, b) => a + b)// 方法2:簡化 (scala的至簡原則)val rdd2 = rdd.flatMap(_.split("\t")).map((_, 1)).reduceByKey(_+_) rdd2.collect().foreach(println)sparkContext.stop()} }2.Spark SQL
package Com.HT.Final.wordcountimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.{SparkConf, SparkContext}object WordCount_sparksql {def main(args: Array[String]): Unit = {//1.創(chuàng)建Sparksession,獲取SparkContextval sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext: SparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import spark.implicits._ //DS和DF的底層都是RDD,下面的計算過程中底層涉及到他們的相互轉(zhuǎn)換,所以需要導(dǎo)入隱式轉(zhuǎn)換//2.讀取文件,讀取為Dataset[String]val fileDS: Dataset[String] = spark.read.textFile("D:\\Do,cument\\temp\\wordcount\\input.txt")//3.對文件數(shù)據(jù)進(jìn)行處理 -> Dataset[String] val wordDS: Dataset[String] = fileDS.flatMap(line => line.split("\t")) // 分割符\t//4.注冊表wordDS.createOrReplaceTempView("word_count")//5.書寫sql語句val sql:String = "select value as word,count(*) as counts from word_count group by word order by counts desc"//6.執(zhí)行sql語句,查看內(nèi)容val dataFrame: DataFrame = spark.sql(sql)dataFrame.show()//7.關(guān)閉資源sparkContext.stop()spark.stop()} }3.Spark Streaming
package Com.HT.Final.wordcount import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf}object WordCount_sparkstreaming {def main(args: Array[String]): Unit = {//創(chuàng)建一個sparkconf對象,其中l(wèi)ocal[2]表示任務(wù)運行在本地且需要兩個CUPval sparkconf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount") //這里必須至少有2個線程,一個用于接收數(shù)據(jù),一個用于統(tǒng)計//創(chuàng)建StreamingContext對象,rdd批次處理間隔設(shè)為5秒val ssc = new StreamingContext(sparkconf,Seconds(5))// 方法1:從hdfs中讀取文件,生成DStreamval lines = ssc.textFileStream("D:\\Document\\temp\\wordcount\\input.txt") // 必須用流的形式寫入到這個目錄形成文件才能被監(jiān)測到// 方法2:通過Socket端口監(jiān)聽并接收數(shù)據(jù),設(shè)置主機(jī)名、端口、持久化存儲級別(如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤) // val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK) val res = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //用空格分割單詞并計數(shù)res.print() //顯示結(jié)果//啟動spark streamingssc.start()//等待直到任務(wù)停止ssc.awaitTermination()ssc.stop()} }4.Structured Streaming
package Com.HT.Final.wordcountimport org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount_structuredstreaming {def main(args: Array[String]): Unit = { //1.創(chuàng)建SparkSessionval spark = SparkSession.builder().master("local[*]").appName("structuredstreaming").getOrCreate()spark.sparkContext.setLogLevel("WARN") // 設(shè)置日志級別import spark.implicits._ // 導(dǎo)入隱式轉(zhuǎn)換//2.數(shù)據(jù)集的生成,數(shù)據(jù)讀取val source: DataFrame = spark.readStream.format("socket") // 設(shè)置socket讀取流數(shù)據(jù).option("host","localhost") // 監(jiān)聽主機(jī)的ip地址或主機(jī)名.option("port",9999) // 指定監(jiān)聽主機(jī)的端口.load()// 3.數(shù)據(jù)的處理:行轉(zhuǎn)換成一個個單詞// 方法1:Dataset[String] -> Dataset[(String, Int)] -> KeyValueGroupedDataset[String, (String, Int)] -> Dataset[(String, Long)]// groupByKey :按Key進(jìn)行分組,返回[K,Iterable[V]] // val words: Dataset[(String, Long)] = source.as[String].flatMap(_.split(" ")).map((_,1)).groupByKey(_._1).count()// 方法2:Dataset[String] -> RelationalGroupedDataset -> DataFrame// groupBy:新建一個RelationalGroupedDataset,而這個方法提供count(),max(),agg()等方法。// groupByKey 后返回的類是 KeyValueGroupedDataset ,它里面所提供的操作接口不如 groupBy 返回的 RelationalGroupedDataset 所提供的接口豐富。val words: DataFrame = source.as[String].flatMap(_.split(" ")).groupBy("value").count()//4.結(jié)果集的生成輸出words.writeStream.outputMode(OutputMode.Complete()).format("console") // 設(shè)置在控制臺顯示結(jié)果.start() // 開啟.awaitTermination() // 等待直到任務(wù)停止} } -
案例1:求TOP值
package Com.HT.Finalimport org.apache.spark.{SparkConf, SparkContext}object TopN {def main(args: Array[String]): Unit = {// 設(shè)置環(huán)境val sparkConf = new SparkConf().setMaster("local").setAppName("TopN")val sparkContext = new SparkContext(sparkConf)// 讀取文件val rdd = sparkContext.textFile("D:\\Document\\temp\\rddfile\\TopN\\input.txt")// 過濾數(shù)據(jù):長度小于多少、分割后長度小于多少val filterRDD = rdd.filter(line => (line.trim().length>0) && (line.split(",").length == 4))// 分割、排序、輸出var i = 1; // 最終輸出排名的序號val sortRdd = filterRDD.map(_.split(",")(2)) // 分隔每一行數(shù)據(jù),RDD的類型變成Array[String],然后取索引為2的元素,就是要進(jìn)行排序的數(shù)據(jù).map(x => (x.toInt,"")) // 將該列數(shù)據(jù)的每一行都變?yōu)殒I值對RDD,鍵為數(shù)據(jù),值為"".sortByKey(false) // 根據(jù)鍵進(jìn)行降序排序.map(x => x._1) // 取排序后的那一列數(shù)據(jù),只要鍵不要值.take(5) // 取出top5的數(shù)據(jù).foreach(x => { // 遍歷打印println(i + "\t" + x)i+=1})} } -
案例2:求最大最小值
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object MaxAndMinVal {def main(args: Array[String]): Unit = {// 設(shè)置環(huán)境val sparkConf = new SparkConf().setMaster("local").setAppName("MaxAndMinVal")val sparkContext = new SparkContext(sparkConf)// 讀取文件,讀取進(jìn)來每一行都是一個字符串val lines: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\maxandmin.txt")// 過濾、轉(zhuǎn)換、根據(jù)key進(jìn)行分組、求最大最小值val rdd: Unit = lines.filter(line => line.trim.length > 0) // trim:刪除指定字符串的首尾空白符.map(line => ("key", line.toInt)).groupByKey() // 轉(zhuǎn)換為(“key”,value-list).map(line => {var minValue: Int = Integer.MAX_VALUEvar maxValue: Int = Integer.MIN_VALUEfor (num <- line._2) { // 遍歷value-list。line._2就是鍵值對(key,value-list)中的value-list,這里value-list就是<129,54,167,…,5,329,14,...>if (num < minValue) {minValue = num}if (num > maxValue) {maxValue = num}}(maxValue, minValue)}).collect().foreach(x => {println("最大值 = " + x._1)println("最小值 = " + x._2)})sparkContext.stop()} } -
案例3:文件排序
有多個輸入文件,每個文件中的每一行內(nèi)容均為一個整數(shù)。要求讀取所有文件中的整數(shù),進(jìn)行排序后,輸出到一個新的文件中,輸出的內(nèi)容個數(shù)為每行兩個整數(shù),第一個整數(shù)為第二個整數(shù)的排序位次,第二個整數(shù)為原待排序的整數(shù)。
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}object FileSort {def main(args: Array[String]): Unit = {// 設(shè)置環(huán)境val sparkConf = new SparkConf().setMaster("local").setAppName("FileSort")val sparkContext = new SparkContext(sparkConf)// 讀取文件val rdd: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\filesort",3)// 過濾、分割、排序、輸出var index = 0; // 第一列:序號val result: RDD[(Int, Int)] = rdd.filter(_.trim.length > 0) // 過濾長度不大于0的記錄.map(x => (x.trim.toInt, "")) // 將字符串rdd轉(zhuǎn)換類型為:(整型,"").partitionBy(new HashPartitioner(1)) // 將3個分區(qū)歸為一個:由入輸入文件有多個,產(chǎn)生不同的分區(qū),為了生成序號,使用HashPartitioner將中間的RDD歸約到一起.sortByKey() // 按照key進(jìn)行升序排序.map(kv => { // 輸出兩列index += 1println(index + "\t" + kv._1)(index, kv._1)})result.saveAsTextFile("D:\\Document\\temp\\rddfile\\filesortout") // 保存為一個文件// 關(guān)閉scsparkContext.stop()} } -
案例4:二次排序
對于一個給定的文件(數(shù)據(jù)如file1.txt所示),請對數(shù)據(jù)進(jìn)行排序,首先根據(jù)第1列數(shù)據(jù)降序排序,如果第1列數(shù)據(jù)相等,則根據(jù)第2列數(shù)據(jù)降序排序。
spark程序:
package Com.HT.Final.TwoTimesSortimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SecondarySort {def main(args: Array[String]): Unit = {// 設(shè)置配置信息、上下文環(huán)境val sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySort")val sparkContext = new SparkContext(sparkConf)// 過濾、分割、轉(zhuǎn)換、二次排序(第一列降序,第一列相等的按照第二列降序排序)// 讀取文件val lines = sparkContext.textFile("D:\\Document\\temp\\rddfile\\secondarysort\\input.txt")val pairWithSortKey = lines.filter(line => line.trim.length>0) // 過濾.map(line => (new SecondarySortKey(line.split("\t")(0).toInt, line.split("\t")(1).toInt),line))// k-v,k是SecondarySortKey對象,規(guī)定了排序規(guī)則,v是原本輸入的一對數(shù)據(jù)// 根據(jù)鍵進(jìn)行排序,這里會遵循 SecondarySortKey對象 的排序規(guī)則val sorted = pairWithSortKey.sortByKey(false)// 取出原本的一對數(shù)字組成的字符串val sortedResult = sorted.map(sortedLine => sortedLine._2)// 并打印sortedResult.collect().foreach (println)// 關(guān)閉scsparkContext.stop()} }SecondarySortKey:
package Com.HT.Final.TwoTimesSortimport org.apache.spark.{SparkConf, SparkContext}class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {def compare(other:SecondarySortKey):Int = { // 實現(xiàn)compare方法,可以二次排序if (this.first - other.first !=0) { // first與other不相等this.first - other.first // 第一列降序排序} else { // first與other相等this.second - other.second // 第二列降序排序}} } -
案例5:連接操作
任務(wù)描述:在推薦領(lǐng)域有一個著名的開放測試集,下載鏈接是:http://grouplens.org/datasets/movielens/,該測試集包含三個文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt。請編程實現(xiàn):通過連接ratings.dat和movies.dat兩個文件得到平均得分超過4.0的電影列表,采用的數(shù)據(jù)集是:ml-1m。
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SparkJoin {def main(args: Array[String]): Unit = {// 設(shè)置上下文環(huán)境val sparkConf = new SparkConf().setAppName("SparkJoin").setMaster("local")val sparkContext = new SparkContext(sparkConf)//TODO 1.處理ratings數(shù)據(jù):讀取、分割、抽取、計算、keyby// 讀取ratings文件為RDD,一共4列val ratingsRDD: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\join\\ratings.rat")// 提取(第2列movieid電影id, 第3列rating電影評分) val idAndRatings = ratingsRDD.map(line => {val fileds = line.split("::") // 分割,得到字符串?dāng)?shù)組(fileds(1).toInt, fileds(2).toDouble) // 提取電影id和電影評分,索引分別為1和2})// KeyBy: 為各個元素,按指定的函數(shù)生成key,形成key-value的RDD。// 電影id + 計算電影的平均評分val movieIdAndAvgScoreKey = idAndRatings.groupByKey() // 根據(jù)電影id將電影評分進(jìn)行分組.map(data => {val avg = data._2.sum / data._2.size // 求平均評分(data._1, avg) // 返回電影id和平均評分}).keyBy(tup => tup._1) // 設(shè)置key為 電影id, value為 電影id和平均分//TODO 2.處理電影信息的數(shù)據(jù)::讀取、分割、抽取、keyby// 讀取movies文件為RDD,一共3列val moviesRDD = sparkContext.textFile("D:\\Document\\temp\\rddfile\\join\\movies.dat")// 提取(第1列movieid電影id, 第2列moviename電影名稱) val movieskey = moviesRDD.map(line => { // movieskey:(1,(1,Toy Story (1995) ))val fileds = line.split("::") // 分割為 (1,Toy Story (1995))(fileds(0).toInt, fileds(1)) // 整型數(shù),字符串}).keyBy(tup => tup._1) // 設(shè)置key為 電影id, value為電影id和電影名稱//TODO 3.連接、過濾、抽取輸出val joinResult = movieIdAndAvgScoreKey // 連接操作.join(movieskey).filter(f => f._2._1._2 > 4.0) // 過濾.map(f => (f._1, f._2._1._2, f._2._2._2) // 取出電影id,電影平均分,電影名稱)joinResult.saveAsTextFile("D:\\Document\\temp\\rddfile\\joinoutput")} }// KeyBy: 為各個元素,按指定的函數(shù)生成key,形成key-value的RDD。 -
史上最全的spark面試題 https://www.cnblogs.com/think90/p/11461367.html
第十章 流計算
1.流計算與批處理的區(qū)別
批處理:處理離線數(shù)據(jù)。單個處理數(shù)據(jù)量大,處理速度比流慢。
流計算:處理實時產(chǎn)生的數(shù)據(jù)。單次處理的數(shù)據(jù)量小,但處理速度更快。
2.文件流
Spark支持從兼容HDFS API的文件系統(tǒng)中讀取數(shù)據(jù),創(chuàng)建數(shù)據(jù)流。就是上面 Spark Streaming程序里提到的文件流。
http://dblab.xmu.edu.cn/blog/1082-2/
https://blog.csdn.net/zhangdy12307/article/details/90379543
3.socket
Spark Streaming可以通過Socket端口監(jiān)聽并接收數(shù)據(jù),然后進(jìn)行相應(yīng)處理。
使用命令開啟socket監(jiān)聽端口:nc -lk [port]
socket工作原理(應(yīng)該不會考):
如果有問題可以在評論區(qū)提出,或者私信我。如果哪里有錯誤的,歡迎提出~
總結(jié)
以上是生活随笔為你收集整理的大数据技术原理与应用:期末考点总结的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: npm run build后如何打开in
- 下一篇: scroll