从零开始学习Hadoop--第2章 第一个MapReduce程序
1.Hadoop從頭說
1.1 Google是一家做搜索的公司
做搜索是技術(shù)難度很高的活。首先要存儲(chǔ)很多的數(shù)據(jù),要把全球的大部分網(wǎng)頁都抓下來,可想而知存儲(chǔ)量有多大。然后,要能快速檢索網(wǎng)頁,用戶輸入幾個(gè)關(guān)鍵詞找資料,越快越好,最好在一秒之內(nèi)出結(jié)果。如果全球每秒有上億個(gè)用戶在檢索,只有一兩秒的檢索時(shí)間,要在全球的網(wǎng)頁里找到最合適的檢索結(jié)果,難度很大。
Google用三個(gè)最重要的核心技術(shù)解決上述問題,它們分別是GFS,MapReduce和BigTable。Google發(fā)表了它們的設(shè)計(jì)論文,但沒有將它們開源,核心競爭力不可能開源的。論文在這里,有興趣的同學(xué)可以去看看:GFS,http://labs.google.com/papers/gfs-sosp2003.pdf;MapReduce,http://labs.google.com/papers/mapreduce-osdi04.pdf
;Bigtable,http://labs.google.com/papers/bigtable-osdi06.pdf。
Google的論文發(fā)表之后,DougCutting等人根據(jù)論文的思想,在開源項(xiàng)目Nutch的基礎(chǔ)上實(shí)現(xiàn)了Hadoop。后來,DougCutting去了Yahoo,繼續(xù)做Hadoop。后來,Hadoop的開發(fā)和應(yīng)用開始爆發(fā)了。
在對應(yīng)關(guān)系上看,HadoopMapReduce對應(yīng)MapReduce,HadoopDistributed File System(HDFS)對應(yīng)GFS,HBase對應(yīng)BigTable。一般我們所說的Hadoop其實(shí)是指Hadoop體系,它包括HadoopMapReduce,HDFS,HBase,還有其他更多的技術(shù)。
1.2MapReduce和HDFS是如何工作的
先用一種有助于理解的方式描述MapReduce和HDFS是如何工作的。假如有1000G的多個(gè)文本文件,內(nèi)容是英文網(wǎng)頁,需要統(tǒng)計(jì)詞頻,也就是哪些單詞出現(xiàn)過,各出現(xiàn)過多少次,有1000臺(tái)計(jì)算機(jī)可供使用,要求速度越快越好。最直接的想法是,把1000G的文件分成1000份,每臺(tái)機(jī)器處理1G數(shù)據(jù)。處理完之后,其他999臺(tái)機(jī)器將處理結(jié)果發(fā)送到一臺(tái)固定的機(jī)器上,由這臺(tái)機(jī)器進(jìn)行合并然后輸出結(jié)果。
Hadoop將這個(gè)過程進(jìn)行自動(dòng)化的處理。首先看如何存儲(chǔ)這1000G的文本文件。HDFS在這1000臺(tái)機(jī)器上創(chuàng)建分布式文件系統(tǒng),將1000G的文件切分成若干個(gè)固定大小的文件塊,每個(gè)塊一般是64M大小,分散存儲(chǔ)在這1000臺(tái)機(jī)器上。這么多機(jī)器,在運(yùn)行的時(shí)候難免會(huì)出現(xiàn)有幾臺(tái)突然死機(jī)或者掛掉的情況,這導(dǎo)致上面存儲(chǔ)的文件塊丟失,會(huì)導(dǎo)致計(jì)算出錯(cuò)。為避免這種情況,HDFS對每個(gè)文件塊都做復(fù)制,復(fù)制成3~5個(gè)相同的塊,放到不同的機(jī)器上,這樣死機(jī)的文件塊在其他機(jī)器上仍然可以找得到,不影響計(jì)算。
MapReduce其實(shí)是兩部分,先是Map過程,然后是Reduce過程。從詞頻計(jì)算來說,假設(shè)某個(gè)文件塊里的一行文字是”Thisis a small cat. That is a smalldog.”,那么,Map過程會(huì)對這一行進(jìn)行處理,將每個(gè)單詞從句子解析出來,依次生成形如<“this”,1>, <”is”, 1>, <”a”, 1>, <”small”, 1>,<”cat”, 1>, <”that”, 1>, <”is”, 1>,<”a”, 1>, <”small”, 1>, <”dog”,1>的鍵值對,<”this”,1>表示“this”這個(gè)單詞出現(xiàn)了1次,在每個(gè)鍵值對里,單詞出現(xiàn)的次數(shù)都是1次,允許有相同的鍵值對多次出現(xiàn),比如<”is”,1>這個(gè)鍵值對出現(xiàn)了2次。Reduce過程就是合并同類項(xiàng),將上述產(chǎn)生的相同的鍵值對合并起來,將這些單詞出現(xiàn)的次數(shù)累加起來,計(jì)算結(jié)果就是<“this”,1>, <”is”, 2>, <”a”, 2>, <”small”, 2>,<”cat”, 1>, <”that”, 1>, <”dog”,1>。這種方式很簡潔,并且可以進(jìn)行多種形式的優(yōu)化。比如說,在一個(gè)機(jī)器上,對本地存儲(chǔ)的1G的文件塊先Map,然后再Reduce,那么就得到了這1G的詞頻統(tǒng)計(jì)結(jié)果,然后再將這個(gè)結(jié)果傳送到遠(yuǎn)程機(jī)器,跟其他999臺(tái)機(jī)器的統(tǒng)計(jì)結(jié)果再次進(jìn)行Reduce,就得到1000G文件的全部詞頻統(tǒng)計(jì)結(jié)果。如果文件沒有那么大,只有三四個(gè)G,就不需要在本地進(jìn)行Reduce了,每次Map之后直接將結(jié)果傳送到遠(yuǎn)程機(jī)器做Reduce。
具體地,如果用Hadoop來做詞頻統(tǒng)計(jì),流程是這樣的:
1)先用HDFS的命令行工具,將1000G的文件復(fù)制到HDFS上;
2)用Java寫MapReduce代碼,寫完后調(diào)試編譯,然后打包成Jar包;
3)執(zhí)行Hadoop命令,用這個(gè)Jar包在Hadoop集群上處理1000G的文件,然后將結(jié)果文件存放到指定的目錄。
4)用HDFS的命令行工具查看處理結(jié)果文件。
1.3 API參考
開發(fā)過程需要的API全部在JavaAPI和Hadoop API,在下面兩個(gè)地方找:
Hadoop1.2.1的API文檔:http://hadoop.apache.org/docs/r1.2.1/api/index.html
JavaJDK1.7的API文檔:http://docs.oracle.com/javase/7/docs/api/
2. 詞頻統(tǒng)計(jì)
在這里,我們開始實(shí)現(xiàn)WordCount的MapReduce。這里的WordCount程序是從Hadoop的例子代碼改編來的。
3.標(biāo)準(zhǔn)形式的MapReduce程序
所謂標(biāo)準(zhǔn)形式的MapReduce,就說需要寫MapReduce的時(shí)候,腦海里立刻跳出的就是這個(gè)形式,一個(gè)Map的Java文件,一個(gè)Reduce的Java文件,一個(gè)負(fù)責(zé)調(diào)用的主程序Java文件。這個(gè)標(biāo)準(zhǔn)形式已經(jīng)是最簡了,沒有多余的東東可以刪除,沒有肥肉,是干貨。寫MapReduce和主程序的時(shí)候,分別引用哪些包哪些類,每個(gè)包每個(gè)類是什么作用,這些要很清晰。如果記不住的話,將這些代碼寫幾遍,編譯調(diào)試運(yùn)行,然后不看代碼,自己從頭寫出來,編譯調(diào)試運(yùn)行,重復(fù)多次應(yīng)該可以記住了。
3.1 目錄和文件結(jié)構(gòu)
首先創(chuàng)建一個(gè)目錄wordcount_01存放源代碼、編譯和打包結(jié)果,比如將這個(gè)目錄放在/home/brian/wordcount_01。
wordcount_01目錄下,有兩個(gè)子目錄,分別是src目錄和classes目錄。src目錄存放Java的源代碼,classes目錄存放編譯結(jié)果。在src目錄下,創(chuàng)建三個(gè)文件,分別是IntSumReducer.java,TokenizerMapper.java,WordCount.java。從MapReduce的角度看,TokenizerMapper.java文件是做Map的代碼,IntSumReducer.java是做Reduce的代碼,WordCount.java是主程序,負(fù)責(zé)執(zhí)行整個(gè)流程。這三個(gè)Java文件內(nèi)容在下面給出。
3.2 TokenizerMapper.java文件的源代碼
| packagecom.brianchen.hadoop; importjava.io.IOException; importjava.util.StringTokenizer; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Mapper; public classTokenizerMapper extendsMapper<Object, Text, Text, IntWritable>{ IntWritable one= new IntWritable(1); Text word = newText(); ? public voidmap(Object key, Text value, Context context) throwsIOException, InterruptedException { StringTokenizeritr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); } } } |
?
下面逐行解釋代碼,所有的類更詳細(xì)的資料其實(shí)都可以在1.3節(jié)的兩個(gè)API地址里找到:
1)“packagecom.brianchen.hadoop”
Java提供包機(jī)制管理代碼,關(guān)鍵詞就是package,可以隨意指定一個(gè)包的名字,諸如筆者的就是”com.brianchen.hadoop”,只要不跟其他的包重復(fù)就可以。為了保證包的唯一性,Sun公司推薦用公司的域名的逆序作為包名,于是大家就在代碼里看到諸如”org.apache.hadoop”之類的包名。
2)”importjava.io.IOException”
凡是以java開頭的包,在JDK1.7的API里找類的資料。這一句從java的io包里導(dǎo)入IOException。IOException,輸入輸出異常類。所謂異常,就是Exception,就是程序出錯(cuò)了,異常機(jī)制是Java的錯(cuò)誤捕獲機(jī)制。那么,IOException就是處理輸入輸出錯(cuò)誤時(shí)候的異常,I是Input,O是Output。
3)“import java.util.StringTokenizer”
從java的util包引入StringTokenizer類。StringTokenizer將符合一定格式的字符串拆分開。比如說,”Thisis a cat”是一個(gè)字符串,這四個(gè)單詞是用空格符隔開的,那么StringTokenizer可以將它們拆成四個(gè)單詞”This”,“is”,”a”,“cat”。如果是用其他符號(hào)隔開,也能處理,比如”14;229;37”這個(gè)字符串,這三個(gè)數(shù)字是分號(hào)隔開的,StringTokenizer將它們拆成”14”,“229”,“37”。只要指定了分隔符,StringTokenizer就可以將字符串拆開。“拆開”的術(shù)語叫“解析”。
4)”importorg.apache.hadoop.io.IntWritable”
凡是以org.apache.hadoop開頭的包,在Hadoop1.2.1的API找類的詳細(xì)信息。從hadoop的io包里引入IntWritable類。IntWritable類表示的是一個(gè)整數(shù),是一個(gè)以類表示的整數(shù),是一個(gè)以類表示的可序列化的整數(shù)。在Java里,要表示一個(gè)整數(shù),假如是15,可以用int類型,int類型是Java的基本類型,占4個(gè)字節(jié),也可以用Integer類,Integer類封裝了一個(gè)int類型,讓整數(shù)成為類。Integer類是可以序列化的。但Hadoop覺得Java的序列化不適合自己,于是實(shí)現(xiàn)了IntWritable類。至于什么是序列化,這個(gè)問題比較長,這個(gè)問題會(huì)在后面章節(jié)詳細(xì)講。
5)“import org.apache.hadoop.io.Text”
從hadoop的io包里引入Text類。Text類是存儲(chǔ)字符串的可比較可序列化類。
6)“import org.apache.hadoop.mapreduce.Mapper”
Mapper類很重要,它將輸入鍵值對映射到輸出鍵值對,也就是MapReduce里的Map過程。
7)”publicclass TokenizerMapper extends Mapper<Object, Text, Text,IntWritable>”
定義一個(gè)自己的Map過程,類名是TokenizerMapper,它繼承了Hadoop的Mapper類。“<Object,Text, Text,IntWritable>”,這里,第一個(gè)參數(shù)類型是Object,表示輸入鍵key的參數(shù)類型,第二個(gè)參數(shù)參數(shù)類型是Text,表示輸入值的類型,第三個(gè)參數(shù)類型也是Text,表示輸出鍵類型,第四個(gè)參數(shù)類型是IntWritable,表示輸出值類型。
在這個(gè)例子里,第一個(gè)參數(shù)Object是Hadoop根據(jù)默認(rèn)值生成的,一般是文件塊里的一行文字的行偏移數(shù),這些偏移數(shù)不重要,在處理時(shí)候一般用不上,第二個(gè)參數(shù)類型是要處理的字符串,形如”Thisis a cat.”。經(jīng)過Map處理之后,輸出的就是諸如<”This”,1>的鍵值對,這個(gè)”This”就是第三個(gè)參數(shù)類型,是Text類型,而1就是第四個(gè)參數(shù)類型,是IntWritable。
8)“IntWritableone = new IntWritable(1)”
定義輸出值,始終是1。
9)“Text word = new Text()”
定義輸出鍵。
10)“public void map(Object key, Text value, Context context) throwsIOException, InterruptedException ”
定義map函數(shù),函數(shù)有三個(gè)參數(shù),key是輸入鍵,它是什么無所謂,實(shí)際上用不到它的,value是輸入值。在map函數(shù)中,出錯(cuò)的時(shí)候會(huì)拋出異常,所以有“throwsIOException, InterruptedException”。至于Context類,這個(gè)類的定義是在TokenizerMapper的祖先類Mapper的內(nèi)部,不需要引入,如果去查看Mapper類的源代碼的話,能看到Context類是繼承MapContext類的。
11)“StringTokenizer itr = new StringTokenizer(value.toString())”
定義StringTokenizer對象itr,StringTokenizer的構(gòu)造函數(shù)只接受Java的String類,而value是Text類,所以要進(jìn)行轉(zhuǎn)化,將value轉(zhuǎn)成String類,也就是“value.toString()”。
12)Map過程
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
在默認(rèn)的情況下,StringTokenizer以空格符作為分隔符對字符串進(jìn)行解析,每次解析會(huì)先調(diào)用hasMoreTokens看看是不是需要做解析,如果需要做,就用nextToken()函數(shù)獲取解析結(jié)果,然后用這個(gè)結(jié)果給word賦值,然后,再將word和one作為一個(gè)鍵值對寫到context里,context會(huì)存儲(chǔ)鍵值留待Reduce過程處理。
3.3IntSumReducer.java文件的源代碼
| packagecom.brianchen.hadoop; importjava.io.IOException; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Reducer; public classIntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ IntWritableresult = new IntWritable(); public voidreduce(Text key, Iterable<IntWritable> values, Contextcontext) throwsIOException, InterruptedException { int sum = 0; for(IntWritable val : values) { sum +=val.get(); } result.set(sum); context.write(key,result); } } |
跟上節(jié)相同的地方就不解釋了,只解釋上節(jié)沒有的東東。
1)”importorg.apache.hadoop.mapreduce.Reducer”
引入hadoop的Reducer類,這個(gè)類負(fù)責(zé)MapReduce的Reduce過程。
2)“public class IntSumReducer extendsReducer<Text,IntWritable,Text,IntWritable> “
定義Reduce過程,也就是IntSumReducer類,這個(gè)類繼承Hadoop的Reducer類。這里的”<Text,IntWritable,Text,IntWritable>”,含義跟上一節(jié)一樣,依次分別是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。
3)“IntWritableresult = new IntWritable()”
定義輸出結(jié)果,這是一個(gè)整數(shù)。
4)“public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException ”
定義reduce函數(shù)。key是輸入鍵類型,values是一個(gè)實(shí)現(xiàn)了Iterable接口的變量,可以把它理解成values里包含若干個(gè)IntWritable整數(shù),可以通過迭代的方式遍歷所有的值,至于Context類型,跟Mapper里的Context類似的方式,是在Redurer類內(nèi)部實(shí)現(xiàn)的。
舉例來說,假如處理一個(gè)字符串”Thisis a That isa“,那么,經(jīng)過Map過程之后,到達(dá)reduce函數(shù)的時(shí)候,依次傳遞給reduce函數(shù)的是:key=”This”,values=<1>;key= “is”,values=<1,1>;key = “a”,values=<1, 1>;key=”That”,values=<1>。注意,在key= “is”和key=”a”的時(shí)候,values里有兩個(gè)1。
5)Reduce過程
intsum = 0;
for(IntWritable val : values) {
sum +=val.get();
}
result.set(sum);
context.write(key,result);
這個(gè)過程,就是用一個(gè)循環(huán),不斷從values里取值,然后累加計(jì)算和,循環(huán)結(jié)束后,將累加和賦值給result變量,然后,將鍵值和累加和作為一個(gè)鍵值對寫入context。繼續(xù)以上一步的例子來說,寫入context的鍵值對依次就是<”This”,1>,<“is”,2>,<“a”,2>,<”That”, 1>。
3.4WordCount.java文件的源代碼
| packagecom.brianchen.hadoop; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; public classWordCount { public staticvoid main(String[] args) throws Exception { Configurationconf = new Configuration(); String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount <in> <out>"); System.exit(2); } Job job = newJob(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); } } |
1)”importorg.apache.hadoop.conf.Configuration”
Configuration類,顧名思義,讀寫和保存各種配置資源。
2)“import org.apache.hadoop.fs.Path”
引入Path類,Path類保存文件或者目錄的路徑字符串。
3)“import org.apache.hadoop.mapreduce.Job”
引入Job類。在hadoop里,每個(gè)需要執(zhí)行的任務(wù)是一個(gè)Job,這個(gè)Job負(fù)責(zé)很多事情,包括參數(shù)配置,設(shè)置MapReduce細(xì)節(jié),提交到Hadoop集群,執(zhí)行控制,查詢執(zhí)行狀態(tài),等等。
4)”importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat”
引入FileInputFormat類。這個(gè)類的很重要的作用就是將文件進(jìn)行切分split,因?yàn)橹挥星蟹植趴梢圆⑿刑幚怼_@個(gè)會(huì)在后面章節(jié)有詳細(xì)解釋。
5)“import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat”
引入FileOutputFormat類,處理結(jié)果寫入輸出文件。
6)“import org.apache.hadoop.util.GenericOptionsParser”
引入GenericOptionsParser類,這個(gè)類負(fù)責(zé)解析hadoop的命令行參數(shù)。
7)”publicclass WordCount ”
這是wordcount主類,它負(fù)責(zé)讀取命令行參數(shù),配置Job,調(diào)用Mapper和Reducer,返回結(jié)果等等工作。
8)“Configurationconf = new Configuration()”
默認(rèn)情況下,Configuration開始實(shí)例化的時(shí)候,會(huì)從Hadoop的配置文件里讀取參數(shù)。
9)”String[]otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()”
讀取參數(shù)分兩步,上一步是從Hadoop的配置文件讀取參數(shù),這一步是從命令行參數(shù)讀取參數(shù),args是存放命令行參數(shù)的字符串?dāng)?shù)組。
10)“if (otherArgs.length != 2) ”
如果命令行參數(shù)不是2個(gè),就出錯(cuò)了,退出。因?yàn)槌绦蛐枰捞幚淼氖悄膫€(gè)輸入文件,處理結(jié)果放到哪個(gè)目錄,必須是兩個(gè)參數(shù)。
11)”Job job = new Job(conf, "wordcount")”
每個(gè)運(yùn)行的處理任務(wù)就是一個(gè)Job,”worodcount”是Job的名字。
12)“ job.setJarByClass(WordCount.class)”
Jar文件是Java語言的一個(gè)功能,可以將所有的類文件打包成一個(gè)Jar文件,setJarByClass的意思是,根據(jù)WordCount類的位置設(shè)置Jar文件。
13)“job.setMapperClass(TokenizerMapper.class)”
設(shè)置Mapper。
14)“job.setReducerClass(IntSumReducer.class)”
設(shè)置Reducer。
15)“job.setOutputKeyClass(Text.class)”
設(shè)置輸出鍵的類型。
16)“job.setOutputValueClass(IntWritable.class)”
設(shè)置輸出值的類型。
17)“FileInputFormat.addInputPath(job, new Path(otherArgs[0]))”
設(shè)置要處理的文件,也就是輸入文件,它是otherArgs的第一個(gè)參數(shù)。
18)“FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]))”
設(shè)置輸出文件,將處理結(jié)果寫到這個(gè)文件里,它是otherArgs的第二個(gè)參數(shù)。
19)“System.exit(job.waitForCompletion(true) ? 0 : 1)”
最后一步,job開始執(zhí)行,等待執(zhí)行結(jié)束。
3.5 編譯
用javac編譯項(xiàng)目。javac即Javaprogramming language compiler,是JavaJDK的命令行編譯器。如前所說,wordcount_01目錄存放源代碼和編譯結(jié)果,要在這個(gè)目錄下進(jìn)行編譯。
3.5.1“cd ~/wordcount_01”
先執(zhí)行這個(gè)命令,切換目錄到wordcount_01下。
3.5.2“javac -classpath/home/brian/usr/hadoop/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-cli-1.2.jar-d ./classes/ ./src/*.java”
執(zhí)行這條命令,編譯源代碼。-classpath,設(shè)置源代碼里使用的各種類的庫文件路徑,路徑之間用”:”隔開,-d參數(shù),設(shè)置編譯后的class文件存在路徑。
3.6 打包
3.6.1“jar -cvf wordcount.jar -C ./classes/ .”
將編譯好的class文件打包成Jar包,jar命令是JDK的打包命令行工具,跟tar非常像。在命令里,-C是值在執(zhí)行jar的時(shí)候?qū)⒛夸浨袚Q到當(dāng)前目錄下的classes目錄,這個(gè)目錄包含編譯好的class文件。打包結(jié)果是wordcount.jar文件,放在當(dāng)前目錄下。
3.7 執(zhí)行
3.7.1首先要確實(shí)一下Hadoop已經(jīng)運(yùn)行起來了。啟動(dòng)方式就是第1章的第7節(jié)。然后,執(zhí)行
3.7.2“cd ~/usr/hadoop/hadoop-1.2.1”
切換目錄到Hadoop的安裝目錄下。
3.7.3“./bin/hadoop fs -put READER.txt readme.txt”
仍然用README.txt做測試,將它復(fù)制到HDFS上,更名為readme.txt
3.7.4“./bin/hadoop fs -rmr output”
處理結(jié)果要放在HDFS的output目錄里,如果這個(gè)目錄已經(jīng)存在了,Hadoop是不會(huì)運(yùn)行的,會(huì)報(bào)錯(cuò),先刪除它。
3.7.5“./bin/hadoop jar /home/brian/wordcount_01/wordcount.jarcom.brianchen.hadoop.WordCount readme.txt output”
運(yùn)行程序,處理readme.txt文件,將結(jié)果寫入output目錄,其中”jar”參數(shù)是指定jar包的位置,而”com.brianchen.hadoop.WordCount”,這里”com.brianchen.hadoop”是包的名字,“WordCount”是主類,注意,如果不寫包名字會(huì)報(bào)錯(cuò)的,必須有包名。
3.8 查看結(jié)果
3.8.1“./bin/hadoop fs -cat output/part-r-00000”
處理結(jié)果output目錄的part-r-00000文件里,用cat命令可以輸出到屏幕顯示。
4.最簡形式的MapReduce
最簡單形式的WordCount的MapReduce代碼是Hadoop自帶的例子,略作改動(dòng)放在這里。這個(gè)例子只有一個(gè)Java文件,Mapper和Reducer都寫在WordCount類的內(nèi)部。
4.1 目錄和文件結(jié)構(gòu)
代碼放在~/wordcount_02目錄,它有兩個(gè)子目錄,分別是classes和src,classes目錄存放編譯結(jié)果,src目錄存放源代碼,src目錄下只有一個(gè)java文件,即”WordCount.java”,所有的代碼都在里面。
4.2WordCount.java文件的源代碼
| packagecom.brianchen.hadoop; importjava.io.IOException; importjava.util.StringTokenizer; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.GenericOptionsParser; public classWordCount { public staticclass TokenizerMapper extendsMapper<Object, Text, Text, IntWritable>{ private finalstatic IntWritable one = new IntWritable(1); private Textword = new Text(); public voidmap(Object key, Text value, Context context) throwsIOException, InterruptedException { StringTokenizeritr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); } } } public staticclass IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ privateIntWritable result = new IntWritable(); public voidreduce(Text key, Iterable<IntWritable> values, Contextcontext) throwsIOException, InterruptedException { int sum =0; for(IntWritable val : values) { sum +=val.get(); } result.set(sum); context.write(key,result); } } public staticvoid main(String[] args) throws Exception { Configurationconf = new Configuration(); String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage:wordcount <in> <out>"); System.exit(2); } Job job = newJob(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job,new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1); } } |
這里的代碼,跟前一節(jié)有點(diǎn)不太一樣。
1)”publicstatic class TokenizerMapper”
這表示TokenizerMapper類是WordCount類的內(nèi)部靜態(tài)類,這種方式可以將TokenizerMapper隱藏在WordCount類內(nèi)部,且TokenizerMapper類不引用WordCount類的任何變量和函數(shù)。
2)“private final static IntWritable one”
跟上一節(jié)的定義相比,這里多了”privatefinalstatic”,”private”表示這個(gè)變量是類的私有變量,“final”表示這變量只能在定義的時(shí)候被賦值一次,以后不可更改,”static”表示這是一個(gè)靜態(tài)變量,獨(dú)立于對象,被該類的所有實(shí)例共享,這種做法的好處是,one這個(gè)值是私有的不可更改的僅僅只有一個(gè),代碼更可靠,更節(jié)省內(nèi)存空間。
4.3 編譯
4.3.1“cd ~/wordcount_02”
4.3.2“javac -classpath/home/brian/usr/hadoop/hadoop-1.2.1/hadoop-core-1.2.1.jar:/home/brian/usr/hadoop/hadoop-1.2.1/lib/commons-cli-1.2.jar-d ./classes/ ./src/WordCount.java ”
4.4 打包
4.4.1“jar -cvf wordcount.jar -C ./classes/ . ”
4.5 運(yùn)行
4.5.1“cd ~/usr/bin/hadoop/hadoop-1.2.1”
4.5.2“./bin/hadoop fs -rmr output”
4.5.3“./bin/hadoop jar /home/brian/wordcount_02/wordcount.jarcom.brianchen.hadoop.WordCount readme.txt output”
4.6 查看結(jié)果
4.6.1“./bin/hadoop fs -cat output/part-r-00000”
?
轉(zhuǎn)載于:https://www.cnblogs.com/pangblog/p/3395249.html
總結(jié)
以上是生活随笔為你收集整理的从零开始学习Hadoop--第2章 第一个MapReduce程序的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java ojdbc14 查询数据表,O
- 下一篇: 银行按揭借款合同范本