通过java api提交自定义hadoop 作业
通過API操作之前要先了解幾個基本知識
一、hadoop的基本數(shù)據(jù)類型和java的基本數(shù)據(jù)類型是不一樣的,但是都存在對應(yīng)的關(guān)系
如下圖
如果需要定義自己的數(shù)據(jù)類型,則必須實(shí)現(xiàn)Writable
hadoop的數(shù)據(jù)類型可以通過get方法獲得對應(yīng)的java數(shù)據(jù)類型
而java的數(shù)據(jù)類型可以通過hadoop數(shù)據(jù)類名的構(gòu)造函數(shù),或者set方法轉(zhuǎn)換
二、hadoop提交作業(yè)的的步驟分為八個,可以理解為天龍八步
如下:
map端工作:
1.1 讀取要操作的文件--這步會將文件的內(nèi)容格式化成鍵值對的形式,鍵為每一行的起始位置偏移,值為每一行的內(nèi)容
1.2 調(diào)用map進(jìn)行處理--在這步使用自定義的Mapper類來實(shí)現(xiàn)自己的邏輯,輸入的數(shù)據(jù)為1.1格式化的鍵值對,輸入的數(shù)據(jù)也是鍵值對的形式
1.3 對map的處理結(jié)果進(jìn)行分區(qū)--map處理完畢之后可以根據(jù)自己的業(yè)務(wù)需求來對鍵值對進(jìn)行分區(qū)處理,比如,將類型不同的結(jié)果保存在不同的文件中等。這里設(shè)置幾個分區(qū),后面就會有對應(yīng)的幾個Reducer來處理相應(yīng)分區(qū)中的內(nèi)容
1.4 分區(qū)之后,對每個分區(qū)的數(shù)據(jù)進(jìn)行排序,分組--排序按照從小到大進(jìn)行排列,排序完畢之后,會將鍵值對中,key相同的選項(xiàng) 的value進(jìn)行合并。如,所有的鍵值對中,可能存在
hello 1
hello 1
key都是hello,進(jìn)行合并之后變成
hello 2
可以根據(jù)自己的業(yè)務(wù)需求對排序和合并的處理進(jìn)行干涉和實(shí)現(xiàn)
1.5 歸約(combiner)--簡單的說就是在map端進(jìn)行一次reduce處理,但是和真正的reduce處理不同之處在于:combiner只能處理本地?cái)?shù)據(jù),不能跨網(wǎng)絡(luò)處理。通過map端的combiner處理可以減少輸出的數(shù)據(jù),因?yàn)閿?shù)據(jù)都是通過網(wǎng)絡(luò)傳輸?shù)?#xff0c;其目的是為了減輕網(wǎng)絡(luò)傳輸?shù)膲毫秃筮卹educe的工作量。并不能取代reduce
reduce端工作:
2.1 通過網(wǎng)絡(luò)將數(shù)據(jù)copy到各個reduce
2.2 調(diào)用reduce進(jìn)行處理--reduce接收的數(shù)據(jù)是整個map端處理完畢之后的鍵值對,輸出的也是鍵值對的集合,是最終的結(jié)果
2.3 將結(jié)果輸出到hdfs文件系統(tǒng)的路徑中
新建一個java項(xiàng)目,并導(dǎo)入hadoop包,在項(xiàng)目選項(xiàng)上右鍵,如圖選擇
找到hadoop的安裝目錄,選擇所有的包
在找到hadoop安裝目錄下的lib,導(dǎo)入其中的所有包
新建JMapper類為自定義的Mapper類
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;//自定義的Mapper類必須繼承Mapper類,并重寫map方法實(shí)現(xiàn)自己的邏輯 public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {//處理輸入文件的每一行都會調(diào)用一次map方法,文件有多少行就會調(diào)用多少次protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key為每一行的起始偏移量//value為每一行的內(nèi)容//每一行的內(nèi)容分割,如hello world,分割成一個String數(shù)組有兩個數(shù)據(jù),分別是hello,worldString[] ss = value.toString().toString().split("\t");//循環(huán)數(shù)組,將其中的每個數(shù)據(jù)當(dāng)做輸出的鍵,值為1,表示這個鍵出現(xiàn)一次for (String s : ss) {//context.write方法可以將map得到的鍵值對輸出context.write(new Text(s), new LongWritable(1));}}; }新建JReducer類為自定義的Reducer import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;//自定義的Reducer類必須繼承Reducer,并重寫reduce方法實(shí)現(xiàn)自己的邏輯,泛型參數(shù)分別為輸入的鍵類型,值類型;輸出的鍵類型,值類型;之后的reduce類似 public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {//處理每一個鍵值對都會調(diào)用一次reduce方法,有多少個鍵值對就調(diào)用多少次protected void reduce(Text key,java.lang.Iterable<LongWritable> value,org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {//key為每一個單獨(dú)的單詞,如:hello,world,you,me等//value為這個單詞在文本中出現(xiàn)的次數(shù)集合,如{1,1,1},表示總共出現(xiàn)了三次long sum = 0;//循環(huán)value,將其中的值相加,得到總次數(shù)for (LongWritable v : value) {sum += v.get();}//context.write輸入新的鍵值對(結(jié)果)context.write(key, new LongWritable(sum));}; }
新建執(zhí)行提交作業(yè)的類,取名JSubmit import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class JSubmit {public static void main(String[] args) throws IOException,URISyntaxException, InterruptedException, ClassNotFoundException {//Path類為hadoop API定義,創(chuàng)建兩個Path對象,一個輸入文件的路徑,一個輸入結(jié)果的路徑Path outPath = new Path("hdfs://localhost:9000/out");//輸入文件的路徑為本地linux系統(tǒng)的文件路徑Path inPath = new Path("/home/hadoop/word");//創(chuàng)建默認(rèn)的Configuration對象Configuration conf = new Configuration();//根據(jù)地址和conf得到hadoop的文件系統(tǒng)獨(dú)享//如果輸入路徑已經(jīng)存在則刪除FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);if (fs.exists(outPath)) {fs.delete(outPath, true);}//根據(jù)conf創(chuàng)建一個新的Job對象,代表要提交的作業(yè),作業(yè)名為JSubmit.class.getSimpleName()Job job = new Job(conf, JSubmit.class.getSimpleName());//1.1//FileInputFormat類設(shè)置要讀取的文件路徑FileInputFormat.setInputPaths(job, inPath);//setInputFormatClass設(shè)置讀取文件時使用的格式化類job.setInputFormatClass(TextInputFormat.class);//1.2調(diào)用自定義的Mapper類的map方法進(jìn)行操作//設(shè)置處理的Mapper類job.setMapperClass(JMapper.class);//設(shè)置Mapper類處理完畢之后輸出的鍵值對 的 數(shù)據(jù)類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);//1.3分區(qū),下面的兩行代碼寫和沒寫都一樣,默認(rèn)的設(shè)置<span style="white-space:pre"> </span>job.setPartitionerClass(HashPartitioner.class); <span style="white-space:pre"> </span>job.setNumReduceTasks(1);//1.4排序,分組//1.5歸約,這三步都有默認(rèn)的設(shè)置,如果沒有特殊的需求可以不管 //2.1將數(shù)據(jù)傳輸?shù)綄?yīng)的Reducer//2.2使用自定義的Reducer類操作//設(shè)置Reducer類job.setReducerClass(JReducer.class);//設(shè)置Reducer處理完之后 輸出的鍵值對 的數(shù)據(jù)類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//2.3將結(jié)果輸出//FileOutputFormat設(shè)置輸出的路徑FileOutputFormat.setOutputPath(job, outPath);//setOutputFormatClass設(shè)置輸出時的格式化類job.setOutputFormatClass(TextOutputFormat.class);//將當(dāng)前的job對象提交job.waitForCompletion(true);} }
運(yùn)行java程序,可以再控制臺看到提交作業(yè)的提示
在hdfs中查看輸出的文件
運(yùn)行成功!
轉(zhuǎn)載于:https://www.cnblogs.com/jchubby/p/4429702.html
總結(jié)
以上是生活随笔為你收集整理的通过java api提交自定义hadoop 作业的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学历提升宣传标语文案29句
- 下一篇: 自己动手写UI库——引入ExtJs(布局