Hadoop学习记录(4)|MapReduce原理|API操作使用
MapReduce概念
MapReduce是一種分布式計(jì)算模型,由谷歌提出,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)計(jì)算問(wèn)題。
MR由兩個(gè)階段組成:Map和Reduce,用戶只需要實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù)實(shí)現(xiàn)分布式計(jì)算。
這兩個(gè)函數(shù)的形參是key,value對(duì),表示函數(shù)的輸入信息。
MP執(zhí)行流程
客戶端提交給jobtracker,jobtracker分配給tasktracker。
trasktracker會(huì)對(duì)任務(wù)進(jìn)行mapper和reducer操作。
MapReduce原理
一個(gè)map輸入k1、v1,數(shù)據(jù)由輸入文件中獲取
map會(huì)把數(shù)據(jù)提交到每一個(gè)shuffle,最后輸出到reducer任務(wù)。
reducer任務(wù)的數(shù)量跟mapper發(fā)送到shuffle的數(shù)量是一致的。
map任務(wù)處理
1.1、讀取輸入文件內(nèi)容,解析成key,value對(duì)。對(duì)輸入文件的每一個(gè)解析成key\value對(duì)。每個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。
1.2、寫(xiě)自己的邏輯,對(duì)輸入的key、value處理,轉(zhuǎn)換成新的key、value輸出。
1.3、對(duì)輸出的key、value進(jìn)行分區(qū)
1.4、對(duì)不同分區(qū)的數(shù)據(jù)按照key進(jìn)行排序、分組。相同key的value放到一個(gè)集合中。
1.5、(可選)分組后對(duì)數(shù)據(jù)進(jìn)行規(guī)約
reduce任務(wù)處理
2.1、對(duì)多個(gè)map任務(wù)的輸出按照不同的分區(qū)通過(guò)網(wǎng)絡(luò)拷貝到reduce節(jié)點(diǎn)。
2.2、對(duì)多個(gè)map任務(wù)輸出進(jìn)行合并、排序。寫(xiě)reduce函數(shù)自己的邏輯,對(duì)輸入key、value處理,轉(zhuǎn)換成新的key、value輸出。
2.3、把輸出的結(jié)果保存到HDFS中。
MapReduce執(zhí)行過(guò)程
1.1.讀取hdfs中文件,每個(gè)解析成<k,v>。每一個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。
解析成兩個(gè)<k,v>,分別<0,hello you><10,hello me>。調(diào)用map函數(shù)兩次。
k是每行的開(kāi)始位置,v則示每行的文本內(nèi)容。
1.2.覆蓋map()函數(shù),接收1.1產(chǎn)生的<k,v>,進(jìn)行處理,轉(zhuǎn)換新的<k,v>輸出。
1.3.對(duì)1.2輸出的<k,v>進(jìn)行分區(qū)
public void map(k,v,context){
String[] split = v.toString().split(“ ”);
for(String str : split){
context.write(str,1);
}
}
1.4.對(duì)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序(按照k)、分組,分別將key的value放到一個(gè)集合中。
map輸出后的數(shù)據(jù)是:<hello,1]>,<you,1>,<hello,1>,<me,1>
排序后: <hello,1]>, <hello,1>,<you,1>,<me,1>
分組后:<hello,{1,1}>,<you,{1}>,<me,{1}>
1.5.(可選)對(duì)分組后的數(shù)據(jù)進(jìn)行規(guī)約。
2.1.多個(gè)map任務(wù)的輸出按照不同的分區(qū),通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上。
2.2.對(duì)多個(gè)map的輸出進(jìn)行合并,排序,覆蓋reduce函數(shù),接收的是分組的數(shù)據(jù),實(shí)現(xiàn)自己的業(yè)務(wù)邏輯,處理后產(chǎn)生新的<k,v>輸出。
reduce函數(shù)被調(diào)用3次,跟分組次數(shù)一致。
public void reduce(k,vs,context){
long sum =0L;
for(long num:vs){
sum +=num;
}
context.write(k,sum);
}
2.3.設(shè)置任務(wù)執(zhí)行,對(duì)reduce輸出的<k,v>保存到hdfs中。
job.waitForCompletion(true);
?
整個(gè)流程我分了四步。簡(jiǎn)單些可以這樣說(shuō),每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤(pán),當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤(pán)中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來(lái)拉數(shù)據(jù)。
MapReduce提交的源代碼分析
waitForCompletion函數(shù)中的submit方法連接和提交到j(luò)obtracker。
在eclipse中寫(xiě)的代碼如何提交到JobTracker中的哪?
答(1)eclipse中調(diào)用的job.waitForCompletion(true),實(shí)際調(diào)用的是JobClient中的提交方法。
contect()
info = jobClient.submitJobInternal(conf)
(2)在contect()中,實(shí)際創(chuàng)建了一個(gè)JobClient對(duì)象,在調(diào)用該對(duì)象的構(gòu)造方法時(shí),獲得了JobTracker的客戶端代理對(duì)象JobSubmissionProtocol
jobSubmissionProtocol實(shí)現(xiàn)類(lèi)是JobTracker
(3)在jobClient.submitJobInternal(conf)方法中,調(diào)用了jobSubmissionProtocol.submitJob()
即,執(zhí)行的是JobTracker.submitJob(..)
Hadoop基本類(lèi)型
Hadoop數(shù)據(jù)類(lèi)型必須實(shí)現(xiàn)Writable接口。
Long LongWritable
Boolean BooleanWritable
String Text
Integer IntWritable
Java類(lèi)型轉(zhuǎn)換為Hadoop基本類(lèi)型:
直接調(diào)用hadoop類(lèi)的構(gòu)造方法,或者調(diào)用set()方法
new IntWritable(123)
Hadoop類(lèi)型轉(zhuǎn)換成Java類(lèi)型:
text需要調(diào)用toString方法
其他類(lèi)型調(diào)用get()方法
使用Hadoop自定義類(lèi)型處理手機(jī)上網(wǎng)流量
1、自定義類(lèi)
class KpiWritable implements Writable{
long upPackNum;
long downPackNum;
long upPayLoad;
long downPayLoad;
public KpiWritable() {}
public KpiWritable(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){
this.upPackNum = Long.parseLong(upPackNum);
this.downPackNum = Long.parseLong(downPackNum);
this.upPayLoad = Long.parseLong(upPayLoad);
this.downPayLoad = Long.parseLong(downPayLoad);
}
@Override
public void write(DataOutput out) throws IOException {
//序列化出去
out.writeLong(upPackNum);
out.writeLong(downPackNum);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
}
@Override
public void readFields(DataInput in) throws IOException {
//順序和寫(xiě)出去一樣
this.upPackNum = in.readLong();
this.downPackNum = in.readLong();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
}
@Override
public String toString() {
return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;
}
2、自定義Map
static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{
protected void map(LongWritable k1, Text v1,
org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context)
throws IOException, InterruptedException {
//處理接收的數(shù)據(jù)
String[] splits = v1.toString().split("\t");
//獲取手機(jī)號(hào)
String msisdn = splits[1];
Text k2 = new Text(msisdn);
KpiWritable v2 = new KpiWritable(splits[6],splits[7],splits[8],splits[9]);
//寫(xiě)入context中交過(guò)reduce執(zhí)行
context.write(k2, v2);
}
}
3、自定義Reduce
static class MyReduce extends Reducer<Text, KpiWritable, Text, KpiWritable>{
protected void reduce(Text k2, Iterable<KpiWritable> v2s,org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)
throws IOException, InterruptedException {
/**
* k2 表示不同的手機(jī)號(hào)
* v2s 表示該手機(jī)號(hào)不同時(shí)段流量集合
*/
//定義計(jì)數(shù)器
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
//遍歷合并數(shù)據(jù)
for(KpiWritable kpi : v2s){
upPackNum += kpi.upPackNum;
downPackNum += kpi.downPackNum;
upPayLoad += kpi.upPayLoad;
downPayLoad += kpi.downPayLoad;
}
//封裝到對(duì)象中
KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
//寫(xiě)入context中
context.write(k2, v3);
}
}
4、寫(xiě)驅(qū)動(dòng)程序
static final String INPUT_PATH = "hdfs://h1:9000/wlan";
static final String OUT_PATH = "hdfs://h1:9000/wlan_out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://h1:9000/"), conf);
Path outPut = new Path(OUT_PATH);
if(fileSystem.exists(outPut)){
fileSystem.delete(outPut,true);
}
/**
* 1.1、指定輸入文件路徑
* 1.1.1. 指定那個(gè)類(lèi)來(lái)格式化輸入文
* 1.2、指定自定義的Mapper類(lèi)
* 1.2.1.指定輸出<k2,v2>的類(lèi)型
* 1.3、指定分區(qū)
* 1.4、排序分區(qū)(TODO)
* 1.5、(可選)合并
* 2.1、多個(gè)map任務(wù)的輸出,通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上,這個(gè)操作由hadoop自動(dòng)完成
* 2.2、指定定義的reduce類(lèi)
* 2.2.1.指定輸出<k3,v3>類(lèi)型
* 2.3、指定輸出位置
* 2.3.1、設(shè)置輸出文件的格式化類(lèi)
*
* 最后吧代碼提交到JobTracker執(zhí)行
*/
//創(chuàng)建Job任務(wù)
Job job = new Job(conf,KpiApp.class.getSimpleName());
//設(shè)置輸入路徑
FileInputFormat.setInputPaths(job, INPUT_PATH);
//設(shè)置輸入數(shù)據(jù)使用的格式化類(lèi)
job.setInputFormatClass(TextInputFormat.class);
//設(shè)置自定義Map類(lèi)
job.setMapperClass(MyMapper.class);
//設(shè)置Map類(lèi)輸出的key和value值類(lèi)型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
//設(shè)置分區(qū)類(lèi)
job.setPartitionerClass(HashPartitioner.class);
//設(shè)置任務(wù)數(shù)
job.setNumReduceTasks(1);
//設(shè)置自定義Reduce類(lèi)
job.setReducerClass(MyReduce.class);
//設(shè)置輸入數(shù)據(jù)使用的格式化類(lèi)
job.setInputFormatClass(TextInputFormat.class);
//設(shè)置Reduce輸出的key和value值類(lèi)型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
//設(shè)置輸出文件位置
FileOutputFormat.setOutputPath(job, outPut);
//設(shè)置將任務(wù)提交到JobTracker
job.waitForCompletion(true);
}
MapReduce 0.x API區(qū)別
hadoop版本0.x
1、包一般是mapred。
2、使用是JobConf類(lèi)創(chuàng)建Job任務(wù)。
3、使用JobClient.runJob(jobConf)提交任務(wù)。
4、自定義類(lèi)需要繼承MapReduceBase實(shí)現(xiàn)Mapper和Reducer接口。
hadoop版本1.x
1、包一般是mapreduce。
2、使用的Job類(lèi)創(chuàng)建任務(wù)。
3、ob.waitForCompletion(true)提交任務(wù)。
4、自定義類(lèi)只需要繼承Mapper和Reducer類(lèi)。
命令行運(yùn)行指定參數(shù)
hadoop jar WordCount.jar hdfs://h1:9000/hello hdfs://h1:9000/cmd_out
跟eclipse直接運(yùn)行的代碼區(qū)別:
1、類(lèi)需要繼承org.apache.hadoop.conf.Configured,并實(shí)現(xiàn)org.apache.hadoop.util.Tool。
2、以前main方法中寫(xiě)的驅(qū)動(dòng)程序卸載覆寫(xiě)的run方法中。
3、run()方法中
job.setJarByClass(CmdWordCount.class);
4、輸入輸出字符串定義為全局空字符串
5、main方法中使用org.apache.hadoop.util.ToolRunner的run方法,傳入new CmdWordCount()和args。args是main方法接收的字符串?dāng)?shù)組。
6、在覆寫(xiě)的run方法中把接收到的args數(shù)組提取并賦值給INPUT和OUTPUT的路徑
INPUT_PATH = agrs0[0];
OUTPUT_PATH = args0[1];
6、打包時(shí)一定要記得選擇輸出類(lèi)。
Hadoop計(jì)數(shù)器
File Input Format Counters
Bytes Read=19 讀取文件字節(jié)數(shù)
Map-Reduce Framework
Map output materialized bytes=65
1#Map input records=2 讀取記錄行
Reduce shuffle bytes=65
Spilled Records=8
Map output bytes=51
Total committed heap usage (bytes)=115675136
5#Combine input records=0 Map合并/規(guī)約輸入
SPLIT_RAW_BYTES=85
3# Reduce input records=4 reduce輸入行
4#Reduce input groups=3 Reduce輸入組數(shù)
Combine output records=0 Map合并/規(guī)約輸出
Reduce output records=3 Reduce輸出記錄
2#Map output records=4 Map輸出行
通過(guò)計(jì)數(shù)器數(shù)可以檢查出Map還是Reduce出現(xiàn)問(wèn)題。
輿情監(jiān)督示例,使用自定義計(jì)數(shù)器監(jiān)控出現(xiàn)次數(shù)。
//自定義計(jì)數(shù)器
Counter helloCounter = context.getCounter("Sensitive Words", "hello");
String line = v1.toString();
if(line.contains("hello")){
helloCounter.increment(1L);
}
Combine操作
為什么使用Combine?
Combiner發(fā)生在Map端。對(duì)數(shù)據(jù)進(jìn)行規(guī)約處理,數(shù)據(jù)量變小了,傳送到reduce的數(shù)據(jù)量變小,傳輸時(shí)間變短,作業(yè)整體時(shí)間變短。
為什么Combine不作為MapReduce的標(biāo)配,而是可選配置?
因?yàn)椴皇撬械乃惴ǘ歼m合使用Combine處理,例如求平均數(shù)。
適用于求和
Combine本身已經(jīng)執(zhí)行了Reduce操作,為什么Reduce階段還要執(zhí)行reduce操作?
combine操作發(fā)送在map端,處理一個(gè)任務(wù)所接收的文件中的數(shù)據(jù),不能跨map任務(wù);只有reduce可以接收多個(gè)map任務(wù)處理數(shù)據(jù)。
設(shè)置參數(shù)
job.setCombinerClass(MyCombiner.class);
MyCombiner.class可以使用Reduce
Partitioner編程
對(duì)輸出的key,value進(jìn)行分區(qū)。
指定自定義partition類(lèi),自定義類(lèi)需要繼承HashPartitioner類(lèi)。覆蓋getPartition方法。
分區(qū)的實(shí)例必須打成Jar包。
作用:
1、根據(jù)業(yè)務(wù)需要,產(chǎn)生多個(gè)輸出文件。
2、多個(gè)reduce任務(wù)在運(yùn)行,提高整體job的運(yùn)行效率。
根據(jù)實(shí)際情況來(lái)使用,如果有5臺(tái)機(jī)器,而分成100個(gè)分區(qū)來(lái)運(yùn)行或出現(xiàn)延遲和整體效率低問(wèn)題。因?yàn)樾枰抨?duì)運(yùn)行!
主要代碼:
設(shè)置成打包運(yùn)行 job.setJarByClass(KpiApp.class);
設(shè)置分區(qū)job.setPartitionerClass(MyPartitioner.class);
設(shè)置Reduce任務(wù)數(shù) job.setNumReduceTask(2);
自定義Partitioner類(lèi)需要繼承HashPartition類(lèi),泛型使用K2,V2的類(lèi)型。覆寫(xiě)getPartition方法。
排序和分組
排序
在map和reduce階段進(jìn)行排序時(shí),比較的是k2。v2是不參與排序比較。如果讓v2也進(jìn)行排序,需要將k2和v2組裝成心的類(lèi),作為k2,才能參與比較。
新類(lèi)需要實(shí)現(xiàn)WritableCompareble,覆寫(xiě)readFilds、write、compareTo、hasCode、equals方法。
在自定義map程序中將k2,v2封裝到新類(lèi)中,當(dāng)做k2寫(xiě)入context。
編寫(xiě)驅(qū)動(dòng)main方法時(shí),設(shè)置map輸出的類(lèi)型(job.setMapOutputKeyClass(new.class))
分組
按照K2進(jìn)行比較,這個(gè)k2是分裝到自定義類(lèi)的k2。
自定義分組類(lèi)實(shí)現(xiàn)RewComparetor,覆寫(xiě)compare方法。
public int compare(NewK2 o1, NewK2 o2) {
return (int) (o1.k2 - o2.k2);
}
/**
* @param b1 表示第一個(gè)參與比較的字節(jié)數(shù)組
* @param s1 表示第一個(gè)參與比較的字節(jié)數(shù)組的起始位置
* @param l1 表示第一個(gè)參與比較的字節(jié)數(shù)組的偏移量
*
* @param b2 表示第二個(gè)參與比較的字節(jié)數(shù)組
* @param s2 表示第二個(gè)參與比較的字節(jié)數(shù)組的起始位置
* @param l2 表示第二個(gè)參與比較的字節(jié)數(shù)組的偏移量
*
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
job任務(wù)設(shè)置分組比較組 job.setGroupComparatorClass(MyGroup.class);
Shuffle
MapReduce的核心,俗稱(chēng)洗牌、打亂。
shuffle在map任務(wù)傳送到reduce任務(wù)之間。
Map端
1、每個(gè)map有一個(gè)環(huán)形內(nèi)存緩沖區(qū),用于存儲(chǔ)任務(wù)的輸出,默認(rèn)100MB,如果達(dá)到法制80MB后臺(tái)線程會(huì)把內(nèi)容寫(xiě)到指定磁盤(pán)(mapred.local.dir)下的新建的溢出文件。
2、寫(xiě)入磁盤(pán)錢(qián),要partition、sort。如果有combiner,combine后寫(xiě)入。
3、最后記錄完成,合并全部溢寫(xiě)文件為一個(gè)分區(qū)且排序的文件。
Reduce端
1、Reduce通過(guò)Http方式得到輸出文件的分區(qū)。
2、TaskTracker為分區(qū)文件運(yùn)行Reduce任務(wù)。復(fù)制階段把Map輸出復(fù)制到Reducer的內(nèi)存或磁盤(pán)。一個(gè)Map任務(wù)完成,Reduce開(kāi)始復(fù)制輸出。
3、排序階段合并map輸出,最后運(yùn)行Reduce階段。
MapReduce常見(jiàn)算法
單詞計(jì)數(shù)
數(shù)據(jù)去重
排序
TopK
選擇
投影
分組
多表連接
單邊關(guān)聯(lián)
轉(zhuǎn)載于:https://www.cnblogs.com/luguoyuanf/p/3593646.html
與50位技術(shù)專(zhuān)家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Hadoop学习记录(4)|MapReduce原理|API操作使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【转】awk 里的substr函数用法举
- 下一篇: 旁注原理