日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例

發布時間:2025/3/21 编程问答 13 豆豆
生活随笔 收集整理的這篇文章主要介紹了 学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

四、MapReduce API介紹

  • 一般MapReduce都是由Mapper, Reducer 及main 函數組成。
  • Mapper程序一般完成鍵值對映射操作;
  • Reducer 程序一般完成鍵值對聚合操作;
  • Main函數則負責組裝Mapper,Reducer及必要的配置;
  • 高階編程還涉及到設置輸入輸出文件格式、設置Combiner、Partitioner優化程序等;

4.1、MapReduce程序模塊 : Main 函數

4.2、MapReduce程序模塊: Mapper

  • org.apache.hadoop.mapreduce.Mapper

4.3、MapReduce程序模塊: Reducer

  • org.apache.hadoop.mapreduce.Reducer

五、MapReduce實例

5.1、流程(Mapper、Reducer、Main、打包運行)

  • 參考WordCount程序,修改Mapper;
  • 直接復制 Reducer程序;
  • 直接復制Main函數,并做相應修改;
  • 編譯打包 ;
  • 上傳Jar包;
  • 上傳數據;
  • 運行程序;
  • 查看運行結果;
  • 5.2、實例1:按日期訪問統計次數:

    1、參考WordCount程序,修改Mapper;
    (這里新建一個java程序,然后把下面(1、2、3步代碼)復制到類里)

    public static class SpiltMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();//value: email_address | datepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\\|",-1); //word.set(data[1]); //context.write(word, one);}}

    2、直接復制 Reducer程序;

    public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

    3、直接復制Main函數,并做相應修改;

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(CountByDate.class); //我們的主類是CountByDatejob.setMapperClass(SpiltMapper.class); //mapper:我們修改為SpiltMapperjob.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}

    4、編譯打包 (jar打包)



    build出現錯誤及解決辦法:


    完成

    5/6、上傳jar包&數據
    email_log_with_date.txt數據包鏈接:https://pan.baidu.com/s/1HfwHCfmvVdQpuL-MPtpAng
    提取碼:cgnb

    上傳數據包(注意開啟hdfs):

    上傳OK(瀏覽器:master:50070查看)

    7、運行程序
    (注意開啟yarn)

    上傳完成后:

    (master:8088)


    8、查看結果
    (master:50070)


    5.3、實例2:按用戶訪問次數排序

    Mapper、Reducer、Main程序
    SortByCountFirst.Mapper

    package demo;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class SortByCountFirst {//1、修改Mapperpublic static class SpiltMapperextends Mapper<Object, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();//value: email_address | datepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\\|",-1);word.set(data[0]);context.write(word, one);}}//2、直接復制 Reducer程序,不用修改public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}//3、直接復制Main函數,并做相應修改;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: demo.SortByCountFirst <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "sort by count first ");job.setJarByClass(SortByCountFirst.class); //我們的主類是CountByDatejob.setMapperClass(SpiltMapper.class); //mapper:我們修改為SpiltMapperjob.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

    SortByCountSecond.Mapper

    package demo;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class SortByCountSecond {//1、修改Mapperpublic static class SpiltMapperextends Mapper<Object, Text, IntWritable, Text> {private IntWritable count = new IntWritable(1);private Text word = new Text();//value: email_address \t countpublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t",-1);word.set(data[0]);count.set(Integer.parseInt(data[1]));context.write(count,word);}}//2、直接復制 Reducer程序,不用修改public static class ReverseReducerextends Reducer<IntWritable,Text,Text,IntWritable> {public void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {for (Text val : values) {context.write(val,key);}}}//3、直接復制Main函數,并做相應修改;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: demo.SortByCountFirst <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "sort by count first ");job.setJarByClass(SortByCountSecond.class); //我們的主類是CountByDatejob.setMapperClass(SpiltMapper.class); //mapper:我們修改為SpiltMapper // job.setCombinerClass(IntSumReducer.class);job.setReducerClass(ReverseReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

    然后打包上傳

    yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00 yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00 sortbycountsecond_output00

    總結

    以上是生活随笔為你收集整理的学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。