【小白视角】大数据基础实践(五) MapReduce编程基础操作
目錄
- 1. MapReduce 簡(jiǎn)介
- 1.1 起源
- 1.2 模型簡(jiǎn)介
- 1.3 MRv1體系結(jié)構(gòu)
- 1.4 YARN
- 1.4.1 YARN體系結(jié)構(gòu)
- 1.4.2 YARN工作流程
- 2. MapReduce 工作流程
- 3. Java Api要點(diǎn)
- 4. 實(shí)驗(yàn)過(guò)程
- 最后
1. MapReduce 簡(jiǎn)介
1.1 起源
在函數(shù)式語(yǔ)言里,map表示對(duì)一個(gè)列表(List)中的每個(gè)元素做計(jì)算,reduce表示對(duì)一個(gè)列表中的每個(gè)元素做迭代計(jì)算。
它們具體的計(jì)算是通過(guò)傳入的函數(shù)來(lái)實(shí)現(xiàn)的,map和reduce提供的是計(jì)算的框架。
- 在MapReduce里,map處理的是原始數(shù)據(jù),每條數(shù)據(jù)之間互相沒(méi)有關(guān)系;
- 到了reduce階段,數(shù)據(jù)是以key后面跟著若干個(gè)value來(lái)組織的,這些value有相關(guān)性,至少它們都在一個(gè)key下面,于是就符合函數(shù)式語(yǔ)言里map和reduce的基本思想了。
- “map”和“reduce”的概念和它們的主要思想,都是從函數(shù)式編程語(yǔ)言借用來(lái)的,還有從矢量編程語(yǔ)言里借來(lái)的特性。極大地方便了編程人員在不會(huì)分布式并行編程的情況下,將自己的程序運(yùn)行在分布式系統(tǒng)上。
1.2 模型簡(jiǎn)介
1.3 MRv1體系結(jié)構(gòu)
MapReduce體系結(jié)構(gòu)主要由四個(gè)部分組成,分別是:Client、JobTracker、TaskTracker以及Task
結(jié)點(diǎn)說(shuō)明:
- Client
用戶(hù)編寫(xiě)的MapReduce程序通過(guò)Client提交到JobTracker端,用戶(hù)可通過(guò)Client提供的一些接口查看作業(yè)運(yùn)行狀態(tài)。 - JobTracker
JobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度;JobTracker監(jiān)控所有TaskTracker與Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應(yīng)的任務(wù)轉(zhuǎn)移到其他節(jié)點(diǎn);JobTracker會(huì)跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息,并將這些信息告訴任務(wù)調(diào)度器(TaskScheduler),而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí),選擇合適的任務(wù)去使用這些資源。 - TaskTracker
TaskTracker會(huì)周期性地通過(guò)“心跳”將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報(bào)給JobTracker,同時(shí)接收J(rèn)obTracker發(fā)送過(guò)來(lái)的命令并執(zhí)行相應(yīng)的操作(如啟動(dòng)新任務(wù)、殺死任務(wù)等)。TaskTracker使用“slot”等量劃分本節(jié)點(diǎn)上的資源量(CPU、內(nèi)存等)。一個(gè)Task獲取到一個(gè)slot后才有機(jī)會(huì)運(yùn)行,而Hadoop調(diào)度器的作用就是將各個(gè)TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot和Reduce slot兩種,分別供Map Task和Reduce Task使用。 - Task
Task分為Map Task和Reduce Task兩種,均由TaskTracker啟動(dòng)。
結(jié)構(gòu)缺點(diǎn):
- 存在單點(diǎn)故障
- JobTracker“大包大攬”導(dǎo)致任務(wù)過(guò)重(任務(wù)多時(shí)內(nèi)存開(kāi)銷(xiāo)大,上限4000節(jié)點(diǎn))
- 容易出現(xiàn)內(nèi)存溢出(分配資源只考慮MapReduce任務(wù)數(shù),不考慮CPU、內(nèi)存)
- 資源劃分不合理(強(qiáng)制劃分為slot ,包括Map slot和Reduce slot)
1.4 YARN
1.4.1 YARN體系結(jié)構(gòu)
架構(gòu)思想
體系結(jié)構(gòu)
ResourceManager
? 處理客戶(hù)端請(qǐng)求
? 啟動(dòng)/監(jiān)控ApplicationMaster
? 監(jiān)控NodeManager
? 資源分配與調(diào)度
NodeManager
? 單個(gè)節(jié)點(diǎn)上的資源管理
? 處理來(lái)自ResourceManger的命令
? 處理來(lái)自ApplicationMaster的命令
ApplicationMaster
? 為應(yīng)用程序申請(qǐng)資源,并分配給內(nèi)部任務(wù)
? 任務(wù)調(diào)度、監(jiān)控與容錯(cuò)
1.4.2 YARN工作流程
步驟1:用戶(hù)編寫(xiě)客戶(hù)端應(yīng)用程序,向YARN提交應(yīng)用程序,提交的內(nèi)容包括ApplicationMaster程序、啟動(dòng)ApplicationMaster的命令、用戶(hù)程序等
步驟2:YARN中的ResourceManager負(fù)責(zé)接收和處理來(lái)自客戶(hù)端的請(qǐng)求,為應(yīng)用程序分配一個(gè)容器,在該容器中啟動(dòng)一個(gè)ApplicationMaster
步驟3:ApplicationMaster被創(chuàng)建后會(huì)首先向ResourceManager注冊(cè)
步驟4:ApplicationMaster采用輪詢(xún)的方式向ResourceManager申請(qǐng)資源
步驟5:ResourceManager以“容器”的形式向提出申請(qǐng)的ApplicationMaster分配資源
步驟6:在容器中啟動(dòng)任務(wù)(運(yùn)行環(huán)境、腳本)
步驟7:各個(gè)任務(wù)向ApplicationMaster匯報(bào)自己的狀態(tài)和進(jìn)度
步驟8:應(yīng)用程序運(yùn)行完成后,ApplicationMaster向ResourceManager的應(yīng)用程序管理器注銷(xiāo)并關(guān)閉自己
2. MapReduce 工作流程
? 不同的Map任務(wù)之間不會(huì)進(jìn)行通信
? 不同的Reduce任務(wù)之間也不會(huì)發(fā)生任何信息交換
? 用戶(hù)不能顯式地從一臺(tái)機(jī)器向另一臺(tái)機(jī)器發(fā)送消息
? 所有的數(shù)據(jù)交換都是通過(guò)MapReduce框架自身去實(shí)現(xiàn)的
例子
3. Java Api要點(diǎn)
- Writable
Hadoop 自定義的序列化接口。當(dāng)要在進(jìn)程間傳遞對(duì)象或持久化對(duì)象的時(shí)候,就需要序列化對(duì)象成字節(jié)流,反之當(dāng)要將接收到或從磁盤(pán)讀取的字節(jié)流轉(zhuǎn)換為對(duì)象,就要進(jìn)行反序列化。Map 和 Reduce 的 key、value 數(shù)據(jù)格式均為 Writeable 類(lèi)型,其中 key 還需實(shí)現(xiàn)WritableComparable 接口。Java 基本類(lèi)型對(duì)應(yīng) writable 類(lèi)型的封裝如下:
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | ShortWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| enum | EnumWritable |
| Map | MapWritable |
(2)InputFormat
用于描述輸入數(shù)據(jù)的格式。提供兩個(gè)功能:
getSplits()數(shù)據(jù)分片,按照某個(gè)策略將輸入數(shù)據(jù)切分成若干個(gè)split,以便確定Map任務(wù)個(gè)數(shù)以及對(duì)應(yīng)的 split;createRecordReader(),將某個(gè)split解析成一個(gè)個(gè) key-value 對(duì)。
FileInputFormat是所有以文件作為數(shù)據(jù)源的 InputFormat實(shí)現(xiàn)基類(lèi),小文件不會(huì)進(jìn)行分片,記錄讀取調(diào)用子類(lèi) TextInputFormat實(shí)現(xiàn);
- TextInputFormat是默認(rèn)處理類(lèi),處理普通文本文件,以文件中每一行作為一條記錄,行起始偏移量為key,每一行文本為 value;
- CombineFileInputFormat 針對(duì)小文件設(shè)計(jì),可以合并小文件;
- KeyValueTextInputFormat適合處理一行兩列并以tab作為分隔符的數(shù)據(jù);
- NLineInputFormat控制每個(gè) split中的行數(shù)。
(3)OutputFormat
主要用于描述輸出數(shù)據(jù)的格式。Hadoop 自帶多種 OutputFormat 的實(shí)現(xiàn)。
- TextOutputFormat默認(rèn)的輸出格式,key 和 value 中間用 tab 分隔;
- SequenceFileOutputFormat,將 key 和 value 以 SequenceFile 格式輸出;
- SequenceFileAsOutputFormat,將 key 和 value 以原始二進(jìn)制格式輸出;
- MapFileOutputFormat,將 key 和 value 寫(xiě)入 MapFile 中;
- MultipleOutputFormat,默認(rèn)情況下 Reducer 會(huì)產(chǎn)生一個(gè)輸出,用該格式可以實(shí)現(xiàn)一個(gè)Reducer 多個(gè)輸出。
(4)Mapper/Reducer
封裝了應(yīng)用程序的處理邏輯,主要由 map、reduce 方法實(shí)現(xiàn)。
(5)Partitioner
根據(jù) map 輸出的 key 進(jìn)行分區(qū),通過(guò) getPartition()方法返回分區(qū)值,默認(rèn)使用哈希函
數(shù)。分區(qū)的數(shù)目與一個(gè)作業(yè)的reduce任務(wù)的數(shù)目是一樣的。HashPartitioner是默認(rèn)的Partioner。
4. 實(shí)驗(yàn)過(guò)程
1、計(jì)數(shù)統(tǒng)計(jì)類(lèi)應(yīng)用
仿照 WordCount 例子,編寫(xiě)“TelPubXxx”類(lèi)實(shí)現(xiàn)對(duì)撥打公共服務(wù)號(hào)碼的電話(huà)信息的統(tǒng)計(jì)。給出的一個(gè)文本輸入文件如下,第一列為電話(huà)號(hào)碼、第二列為公共服務(wù)號(hào)碼,中間以空格隔開(kāi)。
13718855152 11216810117315 110
39451849 112
13718855153 110
13718855154 112
18610117315 114
18610117315 114
MapReduce 程序執(zhí)行后輸出結(jié)果如下,電話(huà)號(hào)碼之間用“|”連接:
110 13718855153|16810117315
112 13718855154|39451849|13718855152
114 18610117315|18610117315
運(yùn)行成功
2、兩表聯(lián)結(jié) Join 應(yīng)用
仿照單表關(guān)聯(lián)例子,編寫(xiě)“RelationXxx”類(lèi)實(shí)現(xiàn)多表關(guān)聯(lián)。中文文本文件轉(zhuǎn)成 UTF-8 編碼格式,否則會(huì)亂碼。
輸入 score.txt:
| s003001 | fd3003 | 84 |
| s003001 | fd3004 | 90 |
| s003002 | fd2001 | 71 |
| s002001 | fd1001 | 66 |
| s001001 | fd1001 | 98 |
| s001001 | fd1002 | 60 |
輸入 major.txt:
| fd1001 | 數(shù)據(jù)挖掘 | 數(shù)學(xué)系 |
| fd2001 | 電子工程 | 電子系 |
| fd2002 | 電子技術(shù) | 電子系 |
| fd3001 | 大數(shù)據(jù) | 計(jì)算機(jī)系 |
| fd3002 | 網(wǎng)絡(luò)工程 | 計(jì)算機(jī)系 |
| fd3003 | Java 應(yīng)用 | 計(jì)算機(jī)系 |
| fd3004 | web 前端 | 計(jì)算機(jī)系 |
輸出結(jié)果:
| fd1001 | 數(shù)據(jù)挖掘 | 數(shù)學(xué)系 | s001001 | 98 |
| fd1001 | 數(shù)據(jù)挖掘 | 數(shù)學(xué)系 | s002001 | 66 |
| fd2001 | 電子工程 | 電子系 | s003002 | 71 |
| fd3003 | Java 應(yīng)用 | 計(jì)算機(jī)系 | s003001 | 84 |
| fd3004 | web 前端 | 計(jì)算機(jī)系 | s003001 | 90 |
將其中需要的東西傳到hdfs中去。
沒(méi)有報(bào)錯(cuò)。查看結(jié)果
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class RelationZqc {public static int time = 0;public static class RelationMap extends Mapper<Object, Text, Text, Text> {private Text classID = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String filename=((FileSplit)context.getInputSplit()).getPath().getName();String[] s = value.toString().split(" ");if(filename.equals("score.txt")){classID.set(s[1]);String val="1," + s[0] + "," + s[2];context.write(classID,new Text(val));}else if (filename.equals("major.txt")){if(!s[0].equals("classid")){classID.set(s[0]);String val = "2," + s[1] + "," + s[2];context.write(classID,new Text(val));}}}}public static class RelationReduce extends Reducer<Text, Text, Text, Text> {private Text result = new Text();public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String[][] studentTable=new String[10][2];String[] data;String classID = "nil";if(time == 0){context.write(new Text("classid"), new Text("classname deptname studentid score"));time++;}int cnt = 0;for (Text val : values) {data = val.toString().split(",");if(data[0].equals("1")){studentTable[cnt][0] = data[1];studentTable[cnt][1] = data[2];cnt = cnt + 1;}else if(data.length == 3 && data[0].equals("2")){classID = data[1] + " " + data[2];}}for(int i = 0; i < cnt; i++){if(classID.equals("nil")) continue;String s=classID+" "+studentTable[i][0]+" "+studentTable[i][1];result.set(s);context.write(key, result);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 加載hadoop配置conf.set("fs.defaultFS", "hdfs://localhost:9000");String[] otherArgs = new String[]{"input/score.txt", "input/major.txt", "output/outputRelationZqc"}; // String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: Relation <in> <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "RelationZqc");// 設(shè)置環(huán)境參數(shù)job.setJarByClass(RelationZqc.class);// 設(shè)置程序主類(lèi)job.setMapperClass(RelationMap.class);// 設(shè)置用戶(hù)實(shí)現(xiàn)的Mapper類(lèi)job.setReducerClass(RelationReduce.class);// 設(shè)置用戶(hù)實(shí)現(xiàn)的Reducer類(lèi)job.setOutputKeyClass(Text.class);// 設(shè)置輸出key類(lèi)型job.setOutputValueClass(Text.class); // 設(shè)置輸出value類(lèi)型for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加輸入文件路徑}FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 設(shè)置輸出文件路徑System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作業(yè)并等待結(jié)束} }3、簡(jiǎn)單排序類(lèi)應(yīng)用編寫(xiě) MapReduce 程序“SortXxx” 類(lèi),要求輸入文件 sort1.txt、sort2.txt、sort3.txt 內(nèi)容,由程序隨機(jī)生成若干條數(shù)據(jù)并存儲(chǔ)到 HDFS 上,每條數(shù)據(jù)占一行,數(shù)據(jù)可以是日期也可以是數(shù)字;輸出結(jié)果為兩列數(shù)據(jù),第一列是輸入文件中的原始數(shù)據(jù),第二列是該數(shù)據(jù)的排位。
運(yùn)行成功
最后
小生凡一,期待你的關(guān)注。
總結(jié)
以上是生活随笔為你收集整理的【小白视角】大数据基础实践(五) MapReduce编程基础操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2022-05-20 工作记录--Rea
- 下一篇: 听书记录《人性中的善良天使》