JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度
文章目錄
- 1.分布式文件系統HDFS
- 1.HDFS的來源
- 2.HDFS的架構圖之基礎架構
- 2.1 master/slave 架構
- 2.2 名字空間(NameSpace)
- 2.3 文件操作
- 2.4副本機制
- 2.5心跳機制
- 2.6 一次寫入,多次讀出
- 3.NameNode與Datanode的總結概述
- 3.1namenode 元數據管理
- 3.2 Datanode 數據存儲
- 4.文件副本機制以及block塊存儲
- 5.元文件FSImage與edits
- 6、HDFS的文件寫入過程
- 7、HDFS的文件讀取過程
- 8.HDFS基本Shell操作
- 9.HDFS的api操作
- 9.1前期準備
- 9.2 創建maven工程并導入jar包
- 9.2使用文件系統方式訪問數據
- 9.3獲取FileSystem的方式
- 9.4創建文件夾
- 9.4下載文件
- 9.5上傳文件
- 2.MapReduce介紹
- 1.理解MapReduce思想
- 2.Hadoop MapReduce設計構思
- 3.MapReduce編程規范及示例編寫
- 編程規范
- Map階段2個步驟
- shuffle階段4個步驟(了解,可以全部不用管)
- reduce階段2個步驟
- 4.WordCount實例
- 4.1準備數據并上傳
- 4.2測試官方案例
- 4.3定義一個mapper類
- 4.4定義一個reducer類
- 4.5定義一個主類,并提交job
- 5.hadoop中分片
- 3.Yarn資源調度
- 1.yarn集群的監控管理界面:
- 2.Yarn介紹
- 3.yarn當中的調度器介紹:
- 第一種調度器:FIFO Scheduler (隊列調度器)
- 第二種調度器:capacity scheduler(容量調度器,apache版本默認使用的調度器)
- 第三種調度器:Fair Scheduler(公平調度器,CDH版本的hadoop默認使用的調度器)
1.分布式文件系統HDFS
1.HDFS的來源
HDFS:Hadoop Distributed File system(hadoop分布式文件系統)
HDFS起源于Google的GFS論文(GFS,Mapreduce,BigTable為google的舊的三駕馬車)HBASE
(1) 發表于2003年10月
(2)HDFS是GFS的克隆版
(3)易于擴展的分布式文件系統
(4)運行在大量普通廉價機器上,提供容錯機制
(5)為大量用戶提供性能不錯的文件存取服務
2.HDFS的架構圖之基礎架構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-b9Jv84mH-1641899589055)(day02_hadoop.assets/1626363613640.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-47L4heZx-1641899589056)(day02_hadoop.assets/1626363623431.png)]
2.1 master/slave 架構
NameNode是一個中心服務器,單一節點(簡化系統的設計和實現),負責管理文件系統的名字空間(namespace)以及客戶端對文件的訪問
2.2 名字空間(NameSpace)
HDFS 支持傳統的層次型文件組織結構。用戶或者應用程序可以創建目錄,然后將文件保存在這些目錄里。文件系統名字空間的層次結構和大多數現有的文件系統類似:用戶可以創建、刪除、移動或重命名文件。
Namenode 負責維護文件系統的名字空間,任何對文件系統名字空間或屬性的修改都將被Namenode 記錄下來。
HDFS 會給客戶端提供一個統一的抽象目錄樹,客戶端通過路徑來訪問文件,
形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data。
2.3 文件操作
namenode是負責文件元數據的操作,datanode負責處理文件內容的讀寫請求,跟文件內容相關的數據流不經過Namenode,只詢問它跟哪個dataNode聯系,否則NameNode會成為系統的瓶頸
2.4副本機制
為了容錯,文件的所有 block 都會有副本。每個文件的 block 大小和副本系數都是可配置的。應用程序可以指定某個文件的副本數目。副本系數可以在文件創建的時候指定,也可以在之后改變。
2.5心跳機制
NameNode全權管理數據的復制,它周期性的從集群中的每個DataNode接收心跳信合和狀態報告,接收到心跳信號意味著DataNode節點工作正常,塊狀態報告包含了一個該DataNode上所有的數據列表
2.6 一次寫入,多次讀出
HDFS 是設計成適應一次寫入,多次讀出的場景,且不支持文件的修改。
正因為如此,HDFS 適合用來做大數據分析的底層存儲服務,并不適合用來做.網盤等應用,因為,修改不方便,延遲大,網絡開銷大,成本太高。
3.NameNode與Datanode的總結概述
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ejFN3F6n-1641899589056)(day02_hadoop.assets/1626363915350.png)]
3.1namenode 元數據管理
我們把目錄結構及文件分塊位置信息叫做元數據。Namenode 負責維護整個hdfs文件系統的目錄樹結構,以及每一個文件所對應的 block 塊信息(block 的id,及所在的datanode 服務器)。
3.2 Datanode 數據存儲
文件的各個 block 的具體存儲管理由 datanode 節點承擔。每一個 block 都可以在多個datanode 上。Datanode 需要定時向 Namenode 匯報自己持有的 block信息。 存儲多個副本(副本數量也可以通過參數設置 dfs.replication,默認是 3)。
4.文件副本機制以及block塊存儲
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-MpJAHapu-1641899589057)(day02_hadoop.assets/1626363979672.png)]
所有的文件都是以block塊的方式存放在HDFS文件系統當中,在hadoop1當中,文件的block塊默認大小是64M,hadoop2當中,文件的block塊大小默認是128M,block塊的大小可以通過hdfs-site.xml當中的配置文件進行指定
5.元文件FSImage與edits
FSimage是一個鏡像文件,是一個完整的元數據文件
edits是每隔一個小時生成
客戶端對hdfs進行寫文件時會首先被記錄在edits文件中。
edits修改時元數據也會更新。
每次hdfs更新時edits先更新后客戶端才會看到最新信息。
fsimage:是namenode中關于元數據的鏡像,一般稱為@檢查點。
一般開始時對namenode的操作都放在edits中,為什么不放在fsimage中呢?
因為fsimage是namenode的完整的鏡像,內容很大,如果每次都加載到內存的話生成樹狀拓撲結構,這是非常耗內存和CPU。
fsimage內容包含了namenode管理下的所有datanode中文件及文件block及block所在的datanode的元數據信息。隨著edits內容增大,就需要在一定時間點和fsimage合并。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5eMUXEsj-1641899589057)(day02_hadoop.assets/1626591630130.png)]
2.x的hadoop元數據合并條件
dfs.namenode.checkpoint.period: 默認是一個小時(3600s)
dfs.namenode.checkpoint.txns:默認為1000000條edits記錄
FSimage文件當中的文件信息查看:
cd /opt/servers/hadoop-2.7.7/tmp/dfs/name/current hdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xmledits當中的文件信息查看:
cd /opt/servers/hadoop-2.7.7/tmp/dfs/nn/edits hdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML6、HDFS的文件寫入過程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Kpd7Gnm0-1641899589058)(day02_hadoop.assets/a.png)]
詳細步驟解析:
1、 client發起文件上傳請求,通過RPC與NameNode建立通訊,NameNode檢查目標文件是否已存在,父目錄是否存在,返回是否可以上傳;
2、 client請求第一個block該傳輸到哪些DataNode服務器上;
3、 NameNode根據配置文件中指定的備份數量及機架感知原理進行文件分配,返回可用的DataNode的地址如:A,B,C;
注:Hadoop在設計時考慮到數據的安全與高效,數據文件默認在HDFS上存放三份,存儲策略為本地一份,同機架內其它某一節點上一份,不同機架的某一節點上一份。
4、 client請求3臺DataNode中的一臺A上傳數據(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然后B調用C,將整個pipeline建立完成,后逐級返回client;
5、 client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位(默認64K),A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答。
6、 數據被分割成一個個packet數據包在pipeline上依次傳輸,在pipeline反方向上,逐個發送ack(命令正確應答),最終由pipeline中第一個DataNode節點A將pipelineack發送給client;
7、 當一個block傳輸完成之后,client再次請求NameNode上傳第二個block到服務器。
7、HDFS的文件讀取過程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Y1fIPoFP-1641899589058)(day02_hadoop.assets/1626591779067.png)]
詳細步驟解析
1、 Client向NameNode發起RPC請求,來確定請求文件block所在的位置;
2、 NameNode會視情況返回文件的部分或者全部block列表,對于每個block,NameNode 都會返回含有該 block 副本的 DataNode 地址; 這些返回的 DN 地址,會按照集群拓撲結構得出 DataNode 與客戶端的距離,然后進行排序,排序兩個規則:網絡拓撲結構中距離 Client 近的排靠前;心跳機制中超時匯報的 DN 狀態為 STALE,這樣的排靠后;
3、 Client 選取排序靠前的 DataNode 來讀取 block,如果客戶端本身就是DataNode,那么將從本地直接獲取數據(短路讀取特性);
4、 底層上本質是建立 Socket Stream(FSDataInputStream),重復的調用父類 DataInputStream 的 read 方法,直到這個塊上的數據讀取完畢;
5、 當讀完列表的 block 后,若文件讀取還沒有結束,客戶端會繼續向NameNode 獲取下一批的 block 列表;
6、 讀取完一個 block 都會進行 checksum 驗證,如果讀取 DataNode 時出現錯誤,客戶端會通知 NameNode,然后再從下一個擁有該 block 副本的DataNode 繼續讀。
7、 read 方法是并行的讀取 block 信息,不是一塊一塊的讀取;NameNode 只是返回Client請求包含塊的DataNode地址,并不是返回請求塊的數據;
8、 最終讀取來所有的 block 會合并成一個完整的最終文件。
8.HDFS基本Shell操作
創建文件夾(支持多級創建):
hadoop fs -mkdir -p /xxx查看目錄:
hadoop fs -ls /xxx遞歸查看多級目錄:
hadoop fs -lsr /xxx上傳文件到HDFS:
hadoop fs -put xxx.txt /xxx下載文件到本地當前目錄:
hadoop fs -get /xxx/xxx/xxx.txt /xxx刪除文件:
hadoop fs -rm /xxx/xxx/xxx.txt刪除文件夾(文件夾必須為空):
hadoop fs -rmdir /xxx/xxx強制刪除文件夾或文件
Hadoop fs -rm -r /xxx9.HDFS的api操作
9.1前期準備
1.解決winutils.exe的問題
? 1)把hadoop2.7.7(windows版)文件目錄放到一個沒有中文沒有空格的路徑下
? 2)在window中配置handoop的環境變量,并且加入path中。
9.2 創建maven工程并導入jar包
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.7</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.7</version> </dependency> <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.7</version> </dependency>9.2使用文件系統方式訪問數據
在 java 中操作 HDFS,主要涉及以下 Class:
Configuration:該類的對象封轉了客戶端或者服務器的配置;
FileSystem:該類的對象是一個文件系統對象,可以用該對象的一些方法來對文件進行操作,通過 FileSystem 的靜態方法 get 獲得該對象。
FileSystem fs = FileSystem.get(conf)get 方法從 conf 中的一個參數 fs.defaultFS 的配置值判斷具體是什么類型的文件系統。如果我們的代碼中沒有指定 fs.defaultFS,并且工程 classpath下也沒有給定相應的配置,conf中的默認值就來自于hadoop的jar包中的core-default.xml , 默 認 值 為 : file:/// , 則 獲 取 的 將 不 是 一 個DistributedFileSystem 的實例,而是一個本地文件系統的客戶端對象
9.3獲取FileSystem的方式
Configuration configuration=new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.65.101:8020"); FileSystem fileSystem=FileSystem.get(configuration); System.out.println(fileSystem.toString());9.4創建文件夾
@Test public void mkdirs() throws Exception{FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.65.101:8020"), new Configuration());boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test"));fileSystem.close(); }9.4下載文件
@Test public void getFileToLocal()throws Exception{Configuration configuration=new Configuration();configuration.set("fs.defaultFS","hdfs://192.168.65.101:8020");FileSystem fileSystem=FileSystem.get(configuration);FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log"));FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\\install.log"));IOUtils.copy(open,fileOutputStream );IOUtils.closeQuietly(open);IOUtils.closeQuietly(fileOutputStream);fileSystem.close(); }9.5上傳文件
@Test public void putData() throws Exception{Configuration configuration=new Configuration();configuration.set("fs.defaultFS","hdfs://192.168.65.101:8020");FileSystem fileSystem=FileSystem.get(configuration);fileSystem.copyFromLocalFile(new Path("file:///c:\\install.log"),new Path("/hello/mydir/test"));fileSystem.close(); }2.MapReduce介紹
1.理解MapReduce思想
? MapReduce思想在生活中處處可見。或多或少都曾接觸過這種思想。MapReduce的思想核心是“分而治之”,適用于大量復雜的任務處理場景(大規模數據處理場景)。即使是發布過論文實現分布式計算的谷歌也只是實現了這種思想,而不是自己原創。
Map負責“分”,即把復雜的任務分解為若干個“簡單的任務”來并行處理。可以進行拆分的前提是這些小任務可以并行計算,彼此間幾乎沒有依賴關系。
Reduce(規約)負責“合”,即對map階段的結果進行全局匯總。
這兩個階段合起來正是MapReduce思想的體現。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-VP8q4c8k-1641899589059)(day02_hadoop.assets/1626596474458.png)]
還有一個比較形象的語言解釋MapReduce:
我們要數圖書館中的所有書。你數1號書架,我數2號書架。這就是“Map”。我們人越多,數書就更快。
現在我們到一起,把所有人的統計數加在一起。這就是“Reduce”。
2.Hadoop MapReduce設計構思
MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在Hadoop集群上。
既然是做計算的框架,那么表現形式就是有個輸入(input),MapReduce操作這個輸入(input),通過本身定義好的計算模型,得到一個輸出(output)。
如何對付大數據處理:分而治之
l 構建抽象模型:Map和Reduce
MapReduce借鑒了函數式語言中的思想,用Map和Reduce兩個函數提供了高層的并行編程抽象模型。
Map: 對一組數據元素進行某種重復式的處理;
Reduce: 對Map的中間結果進行某種進一步的結果整理。
MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現:
map: (k1; v1) → [(k2; v2)]reduce: (k2; [v2]) → [(k3; v3)][外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-8JtXGzwY-1641899589059)(day02_hadoop.assets/1626596845102.png)]
3.MapReduce編程規范及示例編寫
編程規范
mapReduce編程模型的總結:
MapReduce的開發一共有八個步驟其中map階段分為2個步驟,shuffle階段4個步驟,reduce階段分為2個步驟
Map階段2個步驟
第一步:設置inputFormat類,將我們的數據切分成key,value對,輸入到第二步
第二步:自定義map邏輯,處理我們第一步的輸入數據,然后轉換成新的key,value對進行輸出
shuffle階段4個步驟(了解,可以全部不用管)
第三步:對輸出的key,value對進行分區
第四步:對不同分區的數據按照相同的key進行排序
第五步:對分組后的數據進行規約(combine操作),降低數據的網絡拷貝(可選步驟)
第六步:對排序后的額數據進行分組,分組的過程中,將相同key的value放到一個集合當中
reduce階段2個步驟
第七步:對多個map的任務進行合并,排序,寫reduce函數自己的邏輯,對輸入的key,value對進行處理,轉換成新的key,value對進行輸出
第八步:設置outputformat將輸出的key,value對數據進行保存到文件中
4.WordCount實例
4.1準備數據并上傳
cd /opt/servers vim wordcount.txthello,world,hadoop hive,sqoop,flume,hello kitty,tom,jerry,world hadoophdfs dfs -mkdir -p /wordcount/input hdfs dfs -put wordcount.txt /wordcount/input4.2測試官方案例
hadoop jar /opt/servers/hadoop-2.7.7/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /wordcount/input /wordcount/output4.3定義一個mapper類
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] split = line.split(",");for (String word : split) {context.write(new Text(word),new LongWritable(1));}}}4.4定義一個reducer類
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {/*** 自定義我們的reduce邏輯* 所有的key都是我們的單詞,所有的values都是我們單詞出現的次數* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;for (LongWritable value : values) {count += value.get();}context.write(key,new LongWritable(count));} }4.5定義一個主類,并提交job
public class JobMain {/*** 程序main函數的入口類* @param args* @throws Exception*/public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();Job job = Job.getInstance(configuration, JobMain.class.getSimpleName());//打包到集群上面運行時候,必須要添加以下配置,指定程序的main函數job.setJarByClass(JobMain.class);//第一步:讀取輸入文件解析成key,value對job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("hdfs://192.168.65.101:8020/wordcount"));//第二步:設置我們的mapper類job.setMapperClass(WordCountMapper.class);//設置我們map階段完成之后的輸出類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//第三步,第四步,第五步,第六步,省略//第七步:設置我們的reduce類job.setReducerClass(WordCountReducer.class);//設置我們reduce階段完成之后的輸出類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//第八步:設置輸出類以及輸出路徑job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.64.101:8020/wordcount_out"));job.waitForCompletion(true);} }5.hadoop中分片
split,默認情況下一個塊對應一個片。
400m ,128 128 44 128 1個map 128 1個map 44 1個map
130m 128 2 130 1個map
3.Yarn資源調度
1.yarn集群的監控管理界面:
http://192.168.65.101:8088/cluster
2.Yarn介紹
yarn是hadoop集群當中的資源管理系統模塊,從hadoop2.x開始引入yarn來進行管理集群當中的資源(主要是服務器的各種硬件資源,包括CPU,內存,磁盤,網絡IO等)以及運行在yarn上面的各種任務。
總結一句話就是說:yarn主要就是為了調度資源,管理任務等
一級調度管理: 計算資源管理(CPU,內存,網絡IO,磁盤) App生命周期管理 (每一個應用執行的情況,都需要匯報給ResourceManager) 二級調度管理:任務內部的計算模型管理 (AppMaster的任務精細化管理)多樣化的計算模型3.yarn當中的調度器介紹:
第一種調度器:FIFO Scheduler (隊列調度器)
把應用按提交的順序排成一個隊列,這是一個先進先出隊列,在進行資源分配的時候,先給隊列中最頭上的應用進行分配資源,待最頭上的應用需求滿足后再給下一個分配,以此類推。
FIFO Scheduler是最簡單也是最容易理解的調度器,也不需要任何配置,但它并不適用于共享集群。大的應用可能會占用所有集群資源,這就導致其它應用被阻塞。在共享集群中,更適合采用Capacity Scheduler或Fair Scheduler,這兩個調度器都允許大任務和小任務在提交的同時獲得一定的系統資源。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-tBXsk1sU-1641899589060)(day02_hadoop.assets/1626598764614.png)]
?
第二種調度器:capacity scheduler(容量調度器,apache版本默認使用的調度器)
Capacity 調度器允許多個組織共享整個集群,每個組織可以獲得集群的一部分計算能力。通過為每個組織分配專門的隊列,然后再為每個隊列分配一定的集群資源,這樣整個集群就可以通過設置多個隊列的方式給多個組織提供服務了。除此之外,隊列內部又可以垂直劃分,這樣一個組織內部的多個成員就可以共享這個隊列資源了,在一個隊列內部,資源的調度是采用的是先進先出(FIFO)策略。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-dv8nOMKv-1641899589060)(day02_hadoop.assets/1626598877504.png)]
第三種調度器:Fair Scheduler(公平調度器,CDH版本的hadoop默認使用的調度器)
Fair調度器的設計目標是為所有的應用分配公平的資源(對公平的定義可以通過參數來設置)。公平調度在也可以在多個隊列間工作。舉個例子,假設有兩個用戶A和B,他們分別擁有一個隊列。當A啟動一個job而B沒有任務時,A會獲得全部集群資源;當B啟動一個job后,A的job會繼續運行,不過一會兒之后兩個任務會各自獲得一半的集群資源。如果此時B再啟動第二個job并且其它job還在運行,則它將會和B的第一個job共享B這個隊列的資源,也就是B的兩個job會用于四分之一的集群資源,而A的job仍然用于集群一半的資源,結果就是資源最終在兩個用戶之間平等的共享
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-nIXYz2NW-1641899589061)(day02_hadoop.assets/1626598921254.png)]
使用哪種調度器取決于yarn-site.xml當中的
yarn.resourcemanager.scheduler.class 這個屬性的配置
總結
以上是生活随笔為你收集整理的JAVA大数据(二) Hadoop 分布式文件系统HDFS 架构,MapReduce介绍,Yarn资源调度的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 目标检测One-stage和Two-st
- 下一篇: 舒亦梵:4.24非农周即将来临,作为投资