日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop学习记录(4)|MapReduce原理|API操作使用

發(fā)布時(shí)間:2025/3/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop学习记录(4)|MapReduce原理|API操作使用 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

MapReduce概念

MapReduce是一種分布式計(jì)算模型,由谷歌提出,主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)計(jì)算問(wèn)題。

MR由兩個(gè)階段組成:Map和Reduce,用戶只需要實(shí)現(xiàn)map()和reduce()兩個(gè)函數(shù)實(shí)現(xiàn)分布式計(jì)算。

這兩個(gè)函數(shù)的形參是key,value對(duì),表示函數(shù)的輸入信息。

MP執(zhí)行流程

客戶端提交給jobtracker,jobtracker分配給tasktracker。

trasktracker會(huì)對(duì)任務(wù)進(jìn)行mapper和reducer操作。

MapReduce原理

一個(gè)map輸入k1、v1,數(shù)據(jù)由輸入文件中獲取

map會(huì)把數(shù)據(jù)提交到每一個(gè)shuffle,最后輸出到reducer任務(wù)。

reducer任務(wù)的數(shù)量跟mapper發(fā)送到shuffle的數(shù)量是一致的。

map任務(wù)處理

1.1、讀取輸入文件內(nèi)容,解析成key,value對(duì)。對(duì)輸入文件的每一個(gè)解析成key\value對(duì)。每個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。

1.2、寫(xiě)自己的邏輯,對(duì)輸入的key、value處理,轉(zhuǎn)換成新的key、value輸出。

1.3、對(duì)輸出的key、value進(jìn)行分區(qū)

1.4、對(duì)不同分區(qū)的數(shù)據(jù)按照key進(jìn)行排序、分組。相同key的value放到一個(gè)集合中。

1.5、(可選)分組后對(duì)數(shù)據(jù)進(jìn)行規(guī)約

reduce任務(wù)處理

2.1、對(duì)多個(gè)map任務(wù)的輸出按照不同的分區(qū)通過(guò)網(wǎng)絡(luò)拷貝到reduce節(jié)點(diǎn)。

2.2、對(duì)多個(gè)map任務(wù)輸出進(jìn)行合并、排序。寫(xiě)reduce函數(shù)自己的邏輯,對(duì)輸入key、value處理,轉(zhuǎn)換成新的key、value輸出。

2.3、把輸出的結(jié)果保存到HDFS中。

MapReduce執(zhí)行過(guò)程

1.1.讀取hdfs中文件,每個(gè)解析成<k,v>。每一個(gè)鍵值對(duì)調(diào)用一次map函數(shù)。

解析成兩個(gè)<k,v>,分別<0,hello you><10,hello me>。調(diào)用map函數(shù)兩次。

k是每行的開(kāi)始位置,v則示每行的文本內(nèi)容。

1.2.覆蓋map()函數(shù),接收1.1產(chǎn)生的<k,v>,進(jìn)行處理,轉(zhuǎn)換新的<k,v>輸出。

1.3.對(duì)1.2輸出的<k,v>進(jìn)行分區(qū)

public void map(k,v,context){

String[] split = v.toString().split(“ ”);

for(String str : split){

context.write(str,1);

}

}

1.4.對(duì)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序(按照k)、分組,分別將key的value放到一個(gè)集合中。

map輸出后的數(shù)據(jù)是:<hello,1]>,<you,1>,<hello,1>,<me,1>

排序后: <hello,1]>, <hello,1>,<you,1>,<me,1>

分組后:<hello,{1,1}>,<you,{1}>,<me,{1}>

1.5.(可選)對(duì)分組后的數(shù)據(jù)進(jìn)行規(guī)約。

2.1.多個(gè)map任務(wù)的輸出按照不同的分區(qū),通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上。

2.2.對(duì)多個(gè)map的輸出進(jìn)行合并,排序,覆蓋reduce函數(shù),接收的是分組的數(shù)據(jù),實(shí)現(xiàn)自己的業(yè)務(wù)邏輯,處理后產(chǎn)生新的<k,v>輸出。

reduce函數(shù)被調(diào)用3次,跟分組次數(shù)一致。

public void reduce(k,vs,context){

long sum =0L;

for(long num:vs){

sum +=num;

}

context.write(k,sum);

}

2.3.設(shè)置任務(wù)執(zhí)行,對(duì)reduce輸出的<k,v>保存到hdfs中。

job.waitForCompletion(true);

?

整個(gè)流程我分了四步。簡(jiǎn)單些可以這樣說(shuō),每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤(pán),當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤(pán)中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來(lái)拉數(shù)據(jù)。

MapReduce提交的源代碼分析

waitForCompletion函數(shù)中的submit方法連接和提交到j(luò)obtracker。

在eclipse中寫(xiě)的代碼如何提交到JobTracker中的哪?

答(1)eclipse中調(diào)用的job.waitForCompletion(true),實(shí)際調(diào)用的是JobClient中的提交方法。

contect()

info = jobClient.submitJobInternal(conf)

(2)在contect()中,實(shí)際創(chuàng)建了一個(gè)JobClient對(duì)象,在調(diào)用該對(duì)象的構(gòu)造方法時(shí),獲得了JobTracker的客戶端代理對(duì)象JobSubmissionProtocol

jobSubmissionProtocol實(shí)現(xiàn)類(lèi)是JobTracker

(3)在jobClient.submitJobInternal(conf)方法中,調(diào)用了jobSubmissionProtocol.submitJob()

即,執(zhí)行的是JobTracker.submitJob(..)

Hadoop基本類(lèi)型

Hadoop數(shù)據(jù)類(lèi)型必須實(shí)現(xiàn)Writable接口。

Long LongWritable

Boolean BooleanWritable

String Text

Integer IntWritable

Java類(lèi)型轉(zhuǎn)換為Hadoop基本類(lèi)型:

直接調(diào)用hadoop類(lèi)的構(gòu)造方法,或者調(diào)用set()方法

new IntWritable(123)

Hadoop類(lèi)型轉(zhuǎn)換成Java類(lèi)型:

text需要調(diào)用toString方法

其他類(lèi)型調(diào)用get()方法

使用Hadoop自定義類(lèi)型處理手機(jī)上網(wǎng)流量

1、自定義類(lèi)

class KpiWritable implements Writable{

long upPackNum;

long downPackNum;

long upPayLoad;

long downPayLoad;

public KpiWritable() {}

public KpiWritable(String upPackNum,String downPackNum,String upPayLoad,String downPayLoad){

this.upPackNum = Long.parseLong(upPackNum);

this.downPackNum = Long.parseLong(downPackNum);

this.upPayLoad = Long.parseLong(upPayLoad);

this.downPayLoad = Long.parseLong(downPayLoad);

}

@Override

public void write(DataOutput out) throws IOException {

//序列化出去

out.writeLong(upPackNum);

out.writeLong(downPackNum);

out.writeLong(upPayLoad);

out.writeLong(downPayLoad);

}

@Override

public void readFields(DataInput in) throws IOException {

//順序和寫(xiě)出去一樣

this.upPackNum = in.readLong();

this.downPackNum = in.readLong();

this.upPayLoad = in.readLong();

this.downPayLoad = in.readLong();

}

@Override

public String toString() {

return upPackNum+"\t"+downPackNum+"\t"+upPayLoad+"\t"+downPayLoad;

}

2、自定義Map

static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{

protected void map(LongWritable k1, Text v1,

org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,Text,KpiWritable>.Context context)

throws IOException, InterruptedException {

//處理接收的數(shù)據(jù)

String[] splits = v1.toString().split("\t");

//獲取手機(jī)號(hào)

String msisdn = splits[1];

Text k2 = new Text(msisdn);

KpiWritable v2 = new KpiWritable(splits[6],splits[7],splits[8],splits[9]);

//寫(xiě)入context中交過(guò)reduce執(zhí)行

context.write(k2, v2);

}

}

3、自定義Reduce

static class MyReduce extends Reducer<Text, KpiWritable, Text, KpiWritable>{

protected void reduce(Text k2, Iterable<KpiWritable> v2s,org.apache.hadoop.mapreduce.Reducer<Text, KpiWritable, Text, KpiWritable>.Context context)

throws IOException, InterruptedException {

/**

* k2 表示不同的手機(jī)號(hào)

* v2s 表示該手機(jī)號(hào)不同時(shí)段流量集合

*/

//定義計(jì)數(shù)器

long upPackNum = 0L;

long downPackNum = 0L;

long upPayLoad = 0L;

long downPayLoad = 0L;

//遍歷合并數(shù)據(jù)

for(KpiWritable kpi : v2s){

upPackNum += kpi.upPackNum;

downPackNum += kpi.downPackNum;

upPayLoad += kpi.upPayLoad;

downPayLoad += kpi.downPayLoad;

}

//封裝到對(duì)象中

KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");

//寫(xiě)入context中

context.write(k2, v3);

}

}

4、寫(xiě)驅(qū)動(dòng)程序

static final String INPUT_PATH = "hdfs://h1:9000/wlan";

static final String OUT_PATH = "hdfs://h1:9000/wlan_out";

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(new URI("hdfs://h1:9000/"), conf);

Path outPut = new Path(OUT_PATH);

if(fileSystem.exists(outPut)){

fileSystem.delete(outPut,true);

}

/**

* 1.1、指定輸入文件路徑

* 1.1.1. 指定那個(gè)類(lèi)來(lái)格式化輸入文

* 1.2、指定自定義的Mapper類(lèi)

* 1.2.1.指定輸出<k2,v2>的類(lèi)型

* 1.3、指定分區(qū)

* 1.4、排序分區(qū)(TODO)

* 1.5、(可選)合并

* 2.1、多個(gè)map任務(wù)的輸出,通過(guò)網(wǎng)絡(luò)copy到不同的reduce節(jié)點(diǎn)上,這個(gè)操作由hadoop自動(dòng)完成

* 2.2、指定定義的reduce類(lèi)

* 2.2.1.指定輸出<k3,v3>類(lèi)型

* 2.3、指定輸出位置

* 2.3.1、設(shè)置輸出文件的格式化類(lèi)

*

* 最后吧代碼提交到JobTracker執(zhí)行

*/

//創(chuàng)建Job任務(wù)

Job job = new Job(conf,KpiApp.class.getSimpleName());

//設(shè)置輸入路徑

FileInputFormat.setInputPaths(job, INPUT_PATH);

//設(shè)置輸入數(shù)據(jù)使用的格式化類(lèi)

job.setInputFormatClass(TextInputFormat.class);

//設(shè)置自定義Map類(lèi)

job.setMapperClass(MyMapper.class);

//設(shè)置Map類(lèi)輸出的key和value值類(lèi)型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(KpiWritable.class);

//設(shè)置分區(qū)類(lèi)

job.setPartitionerClass(HashPartitioner.class);

//設(shè)置任務(wù)數(shù)

job.setNumReduceTasks(1);

//設(shè)置自定義Reduce類(lèi)

job.setReducerClass(MyReduce.class);

//設(shè)置輸入數(shù)據(jù)使用的格式化類(lèi)

job.setInputFormatClass(TextInputFormat.class);

//設(shè)置Reduce輸出的key和value值類(lèi)型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(KpiWritable.class);

//設(shè)置輸出文件位置

FileOutputFormat.setOutputPath(job, outPut);

//設(shè)置將任務(wù)提交到JobTracker

job.waitForCompletion(true);

}

MapReduce 0.x API區(qū)別

hadoop版本0.x

1、包一般是mapred。

2、使用是JobConf類(lèi)創(chuàng)建Job任務(wù)。

3、使用JobClient.runJob(jobConf)提交任務(wù)。

4、自定義類(lèi)需要繼承MapReduceBase實(shí)現(xiàn)Mapper和Reducer接口。

hadoop版本1.x

1、包一般是mapreduce。

2、使用的Job類(lèi)創(chuàng)建任務(wù)。

3、ob.waitForCompletion(true)提交任務(wù)。

4、自定義類(lèi)只需要繼承Mapper和Reducer類(lèi)。

命令行運(yùn)行指定參數(shù)

hadoop jar WordCount.jar hdfs://h1:9000/hello hdfs://h1:9000/cmd_out

跟eclipse直接運(yùn)行的代碼區(qū)別:

1、類(lèi)需要繼承org.apache.hadoop.conf.Configured,并實(shí)現(xiàn)org.apache.hadoop.util.Tool。

2、以前main方法中寫(xiě)的驅(qū)動(dòng)程序卸載覆寫(xiě)的run方法中。
3、run()方法中

job.setJarByClass(CmdWordCount.class);

4、輸入輸出字符串定義為全局空字符串

5、main方法中使用org.apache.hadoop.util.ToolRunner的run方法,傳入new CmdWordCount()和args。args是main方法接收的字符串?dāng)?shù)組。

6、在覆寫(xiě)的run方法中把接收到的args數(shù)組提取并賦值給INPUT和OUTPUT的路徑

INPUT_PATH = agrs0[0];

OUTPUT_PATH = args0[1];

6、打包時(shí)一定要記得選擇輸出類(lèi)。

Hadoop計(jì)數(shù)器

File Input Format Counters

Bytes Read=19 讀取文件字節(jié)數(shù)

Map-Reduce Framework

Map output materialized bytes=65

1#Map input records=2 讀取記錄行

Reduce shuffle bytes=65

Spilled Records=8

Map output bytes=51

Total committed heap usage (bytes)=115675136

5#Combine input records=0 Map合并/規(guī)約輸入

SPLIT_RAW_BYTES=85

3# Reduce input records=4 reduce輸入行

4#Reduce input groups=3 Reduce輸入組數(shù)

Combine output records=0 Map合并/規(guī)約輸出

Reduce output records=3 Reduce輸出記錄

2#Map output records=4 Map輸出行

通過(guò)計(jì)數(shù)器數(shù)可以檢查出Map還是Reduce出現(xiàn)問(wèn)題。

輿情監(jiān)督示例,使用自定義計(jì)數(shù)器監(jiān)控出現(xiàn)次數(shù)。

//自定義計(jì)數(shù)器

Counter helloCounter = context.getCounter("Sensitive Words", "hello");

String line = v1.toString();

if(line.contains("hello")){

helloCounter.increment(1L);

}

Combine操作

為什么使用Combine?

Combiner發(fā)生在Map端。對(duì)數(shù)據(jù)進(jìn)行規(guī)約處理,數(shù)據(jù)量變小了,傳送到reduce的數(shù)據(jù)量變小,傳輸時(shí)間變短,作業(yè)整體時(shí)間變短。

為什么Combine不作為MapReduce的標(biāo)配,而是可選配置?

因?yàn)椴皇撬械乃惴ǘ歼m合使用Combine處理,例如求平均數(shù)。

適用于求和

Combine本身已經(jīng)執(zhí)行了Reduce操作,為什么Reduce階段還要執(zhí)行reduce操作?

combine操作發(fā)送在map端,處理一個(gè)任務(wù)所接收的文件中的數(shù)據(jù),不能跨map任務(wù);只有reduce可以接收多個(gè)map任務(wù)處理數(shù)據(jù)。

設(shè)置參數(shù)

job.setCombinerClass(MyCombiner.class);

MyCombiner.class可以使用Reduce

Partitioner編程

對(duì)輸出的key,value進(jìn)行分區(qū)。

指定自定義partition類(lèi),自定義類(lèi)需要繼承HashPartitioner類(lèi)。覆蓋getPartition方法。

分區(qū)的實(shí)例必須打成Jar包。

作用:

1、根據(jù)業(yè)務(wù)需要,產(chǎn)生多個(gè)輸出文件。

2、多個(gè)reduce任務(wù)在運(yùn)行,提高整體job的運(yùn)行效率。

根據(jù)實(shí)際情況來(lái)使用,如果有5臺(tái)機(jī)器,而分成100個(gè)分區(qū)來(lái)運(yùn)行或出現(xiàn)延遲和整體效率低問(wèn)題。因?yàn)樾枰抨?duì)運(yùn)行!

主要代碼:

設(shè)置成打包運(yùn)行 job.setJarByClass(KpiApp.class);

設(shè)置分區(qū)job.setPartitionerClass(MyPartitioner.class);

設(shè)置Reduce任務(wù)數(shù) job.setNumReduceTask(2);

自定義Partitioner類(lèi)需要繼承HashPartition類(lèi),泛型使用K2,V2的類(lèi)型。覆寫(xiě)getPartition方法。

排序和分組

排序

在map和reduce階段進(jìn)行排序時(shí),比較的是k2。v2是不參與排序比較。如果讓v2也進(jìn)行排序,需要將k2和v2組裝成心的類(lèi),作為k2,才能參與比較。

新類(lèi)需要實(shí)現(xiàn)WritableCompareble,覆寫(xiě)readFilds、write、compareTo、hasCode、equals方法。

在自定義map程序中將k2,v2封裝到新類(lèi)中,當(dāng)做k2寫(xiě)入context。

編寫(xiě)驅(qū)動(dòng)main方法時(shí),設(shè)置map輸出的類(lèi)型(job.setMapOutputKeyClass(new.class))

分組

按照K2進(jìn)行比較,這個(gè)k2是分裝到自定義類(lèi)的k2。

自定義分組類(lèi)實(shí)現(xiàn)RewComparetor,覆寫(xiě)compare方法。

public int compare(NewK2 o1, NewK2 o2) {

return (int) (o1.k2 - o2.k2);

}

/**

* @param b1 表示第一個(gè)參與比較的字節(jié)數(shù)組

* @param s1 表示第一個(gè)參與比較的字節(jié)數(shù)組的起始位置

* @param l1 表示第一個(gè)參與比較的字節(jié)數(shù)組的偏移量

*

* @param b2 表示第二個(gè)參與比較的字節(jié)數(shù)組

* @param s2 表示第二個(gè)參與比較的字節(jié)數(shù)組的起始位置

* @param l2 表示第二個(gè)參與比較的字節(jié)數(shù)組的偏移量

*

*/

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,

int l2) {

return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);

}

job任務(wù)設(shè)置分組比較組 job.setGroupComparatorClass(MyGroup.class);

Shuffle

MapReduce的核心,俗稱(chēng)洗牌、打亂。

shuffle在map任務(wù)傳送到reduce任務(wù)之間。

Map端

1、每個(gè)map有一個(gè)環(huán)形內(nèi)存緩沖區(qū),用于存儲(chǔ)任務(wù)的輸出,默認(rèn)100MB,如果達(dá)到法制80MB后臺(tái)線程會(huì)把內(nèi)容寫(xiě)到指定磁盤(pán)(mapred.local.dir)下的新建的溢出文件。

2、寫(xiě)入磁盤(pán)錢(qián),要partition、sort。如果有combiner,combine后寫(xiě)入。

3、最后記錄完成,合并全部溢寫(xiě)文件為一個(gè)分區(qū)且排序的文件。

Reduce端

1、Reduce通過(guò)Http方式得到輸出文件的分區(qū)。

2、TaskTracker為分區(qū)文件運(yùn)行Reduce任務(wù)。復(fù)制階段把Map輸出復(fù)制到Reducer的內(nèi)存或磁盤(pán)。一個(gè)Map任務(wù)完成,Reduce開(kāi)始復(fù)制輸出。

3、排序階段合并map輸出,最后運(yùn)行Reduce階段。

MapReduce常見(jiàn)算法

單詞計(jì)數(shù)

數(shù)據(jù)去重

排序

TopK

選擇

投影

分組

多表連接

單邊關(guān)聯(lián)

轉(zhuǎn)載于:https://www.cnblogs.com/luguoyuanf/p/3593646.html

與50位技術(shù)專(zhuān)家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的Hadoop学习记录(4)|MapReduce原理|API操作使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。