日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

3、WordCount源码分析

發布時間:2025/3/15 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 3、WordCount源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html

3.1 特別數據類型介紹

  Hadoop提供了如下內容的數據類型,這些數據類型都實現了WritableComparable接口,以便用這些類型定義的數據可以被序列化進行網絡傳輸和文件存儲,以及進行大小比較。

?

??? BooleanWritable:標準布爾型數值

??? ByteWritable:單字節數值

??? DoubleWritable:雙字節數

??? FloatWritable:浮點數

??? IntWritable:整型數

??? LongWritable:長整型數

??? Text:使用UTF8格式存儲的文本

??? NullWritable:當<key,value>中的key或value為空時使用

?

3.2 舊的WordCount分析

  1)源代碼程序

?

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

??? public static class Map extends MapReduceBase implements
??????????? Mapper<LongWritable, Text, Text, IntWritable> {
??????? private final static IntWritable one = new IntWritable(1);
??????? private Text word = new Text();

??????? public void map(LongWritable key, Text value,
??????????????? OutputCollector<Text, IntWritable> output, Reporter reporter)
??????????????? throws IOException {
??????????? String line = value.toString();
??????????? StringTokenizer tokenizer = new StringTokenizer(line);
??????????? while (tokenizer.hasMoreTokens()) {
??????????????? word.set(tokenizer.nextToken());
??????????????? output.collect(word, one);
??????????? }
??????? }
??? }

??? public static class Reduce extends MapReduceBase implements
??????????? Reducer<Text, IntWritable, Text, IntWritable> {
??????? public void reduce(Text key, Iterator<IntWritable> values,
??????????????? OutputCollector<Text, IntWritable> output, Reporter reporter)
??????????????? throws IOException {
??????????? int sum = 0;
??????????? while (values.hasNext()) {
??????????????? sum += values.next().get();
??????????? }
??????????? output.collect(key, new IntWritable(sum));
??????? }
??? }

??? public static void main(String[] args) throws Exception {
??????? JobConf conf = new JobConf(WordCount.class);
??????? conf.setJobName("wordcount");

??????? conf.setOutputKeyClass(Text.class);
??????? conf.setOutputValueClass(IntWritable.class);

??????? conf.setMapperClass(Map.class);
??????? conf.setCombinerClass(Reduce.class);
??????? conf.setReducerClass(Reduce.class);

??????? conf.setInputFormat(TextInputFormat.class);
??????? conf.setOutputFormat(TextOutputFormat.class);

??????? FileInputFormat.setInputPaths(conf, new Path(args[0]));
??????? FileOutputFormat.setOutputPath(conf, new Path(args[1]));

??????? JobClient.runJob(conf);
??? }
}

?

  3)主方法Main分析

?

public static void main(String[] args) throws Exception {
??? JobConf conf = new JobConf(WordCount.class);
??? conf.setJobName("wordcount");

??? conf.setOutputKeyClass(Text.class);
??? conf.setOutputValueClass(IntWritable.class);

??? conf.setMapperClass(Map.class);
??? conf.setCombinerClass(Reduce.class);
??? conf.setReducerClass(Reduce.class);

??? conf.setInputFormat(TextInputFormat.class);
??? conf.setOutputFormat(TextOutputFormat.class);

??? FileInputFormat.setInputPaths(conf, new Path(args[0]));
??? FileOutputFormat.setOutputPath(conf, new Path(args[1]));

??? JobClient.runJob(conf);
}

?

  首先講解一下Job初始化過程。main函數調用Jobconf類來對MapReduce Job進行初始化,然后調用setJobName()方法命名這個Job。對Job進行合理的命名有助于更快地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行監視

?

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

?

  接著設置Job輸出結果<key,value>的中key和value數據類型,因為結果是<單詞,個數>,所以key設置為"Text"類型,相當于Java中String類型。Value設置為"IntWritable",相當于Java中的int類型。

?

conf.setOutputKeyClass(Text.class );

conf.setOutputValueClass(IntWritable.class );

?

  然后設置Job處理的Map(拆分)、Combiner(中間結果合并)以及Reduce(合并)的相關處理類。這里用Reduce類來進行Map產生的中間結果合并,避免給網絡數據傳輸產生壓力。

?

conf.setMapperClass(Map.class );

conf.setCombinerClass(Reduce.class );

conf.setReducerClass(Reduce.class );

?

  接著就是調用setInputPath()和setOutputPath()設置輸入輸出路徑。

?

conf.setInputFormat(TextInputFormat.class );

conf.setOutputFormat(TextOutputFormat.class );

?

  (1)InputFormat和InputSplit

  InputSplit是Hadoop定義的用來傳送給每個單獨map數據,InputSplit存儲的并非數據本身,而是一個分片長度和一個記錄數據位置數組。生成InputSplit的方法可以通過InputFormat()設置。

  當數據傳送給map時,map會將輸入分片傳送到InputFormat,InputFormat則調用方法getRecordReader()生成RecordReader,RecordReader再通過creatKey()、creatValue()方法創建可供map處理的<key,value>對。簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的。

  Hadoop預定義了多種方法將不同類型的輸入數據轉化為map能夠處理的<key,value>對,它們都繼承自InputFormat,分別是:

?

??? InputFormat

??????? |

??????? |---BaileyBorweinPlouffe.BbpInputFormat

??????? |---ComposableInputFormat

??????? |---CompositeInputFormat

??????? |---DBInputFormat

??????? |---DistSum.Machine.AbstractInputFormat

??????? |---FileInputFormat

??????????? |---CombineFileInputFormat

??????????? |---KeyValueTextInputFormat

??????????? |---NLineInputFormat

??????????? |---SequenceFileInputFormat

??????????? |---TeraInputFormat

??????????? |---TextInputFormat

?

  其中TextInputFormat是Hadoop默認的輸入方法,在TextInputFormat中,每個文件(或其一部分)都會單獨地作為map的輸入,而這個是繼承自FileInputFormat的。之后,每行數據都會生成一條記錄,每條記錄則表示成<key,value>形式:

  • key值是每個數據的記錄在數據分片字節偏移量,數據類型是LongWritable;  

value值是每行的內容,數據類型是Text。

  (2)OutputFormat

  每一種輸入格式都有一種輸出格式與其對應。默認的輸出格式是TextOutputFormat,這種輸出方式與輸入類似,會將每條記錄以一行的形式存入文本文件。不過,它的鍵和值可以是任意形式的,因為程序內容會調用toString()方法將鍵和值轉換為String類型再輸出。

?

  3)Map類中map方法分析

?

public static class Map extends MapReduceBase implements
??????? Mapper<LongWritable, Text, Text, IntWritable> {
??? private final static IntWritable one = new IntWritable(1);
??? private Text word = new Text();

??? public void map(LongWritable key, Text value,
??????????? OutputCollector<Text, IntWritable> output, Reporter reporter)
??????????? throws IOException {
??????? String line = value.toString();
??????? StringTokenizer tokenizer = new StringTokenizer(line);
??????? while (tokenizer.hasMoreTokens()) {
??????????? word.set(tokenizer.nextToken());
??????????? output.collect(word, one);
??????? }
??? }
}

  Map類繼承自MapReduceBase,并且它實現了Mapper接口,此接口是一個規范類型,它有4種形式的參數,分別用來指定map的輸入key值類型、輸入value值類型、輸出key值類型和輸出value值類型。在本例中,因為使用的是TextInputFormat,它的輸出key值是LongWritable類型,輸出value值是Text類型,所以map的輸入類型為<LongWritable,Text>。在本例中需要輸出<word,1>這樣的形式,因此輸出的key值類型是Text,輸出的value值類型是IntWritable。

  實現此接口類還需要實現map方法,map方法會具體負責對輸入進行操作,在本例中,map方法對輸入的行以空格為單位進行切分,然后使用OutputCollect收集輸出的<word,1>。

?

  4)Reduce類中reduce方法分析

?

public static class Reduce extends MapReduceBase implements
??????? Reducer<Text, IntWritable, Text, IntWritable> {
??? public void reduce(Text key, Iterator<IntWritable> values,
??????????? OutputCollector<Text, IntWritable> output, Reporter reporter)
??????????? throws IOException {
??????? int sum = 0;
??????? while (values.hasNext()) {
??????????? sum += values.next().get();
??????? }
??????? output.collect(key, new IntWritable(sum));
??? }
}

  Reduce類也是繼承自MapReduceBase的,需要實現Reducer接口。Reduce類以map的輸出作為輸入,因此Reduce的輸入類型是<Text,Intwritable>。而Reduce的輸出是單詞它的數目,因此,它的輸出類型是<Text,IntWritable>。Reduce類也要實現reduce方法,在此方法中,reduce函數將輸入的key值作為輸出的key值,然后將獲得多個value值加起來,作為輸出的值。

?

3.3 新的WordCount分析

  1)源代碼程序

?

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class?TokenizerMapper

      extends Mapper<Object, Text, Text, IntWritable>{

      private final static IntWritable one = new IntWritable(1);

      private Text word = new Text();

?

      public void map(Object key, Text value, Context context)

        throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

  public static class?IntSumReducer

      extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context)

           throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

           sum += val.get();

        }

      result.set(sum);

      context.write(key, result);

    }

  }

?

  public static void?main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount <in> <out>");

      System.exit(2);

    }

    Job job = new Job(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

?

?  1)Map過程

?

public static class?TokenizerMapper

  extends Mapper<Object, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);

  private Text word = new Text();

  public void map(Object key, Text value, Context context)

    throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

    while (itr.hasMoreTokens()) {

      word.set(itr.nextToken());

      context.write(word, one);

  }

}

?

  Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,并重寫其map方法。通過在map方法中添加兩句把key值和value值輸出到控制臺的代碼,可以發現map方法中value值存儲的是文本文件中的一行(以回車符為行結束標記),而key值為該行的首字母相對于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成為一個個的單詞,并將<word,1>作為map方法的結果輸出,其余的工作都交有MapReduce框架處理。

?

??2)Reduce過程

?

public static class?IntSumReducer

  extends Reducer<Text,IntWritable,Text,IntWritable> {

  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,Context context)

     throws IOException, InterruptedException {

    int sum = 0;

    for (IntWritable val : values) {

      sum += val.get();

    }

    result.set(sum);

    context.write(key, result);

  }

}

  Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,并重寫其reduce方法。Map過程輸出<key,values>中key為單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,所以reduce方法只要遍歷values并求和,即可得到某個單詞的總次數。

?

????3)執行MapReduce任務

?

public static void?main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

  if (otherArgs.length != 2) {

    System.err.println("Usage: wordcount <in> <out>");

    System.exit(2);

  }

  Job job = new Job(conf, "word count");

  job.setJarByClass(WordCount.class);

  job.setMapperClass(TokenizerMapper.class);

  job.setCombinerClass(IntSumReducer.class);

  job.setReducerClass(IntSumReducer.class);

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);

}

  在MapReduce中,由Job對象負責管理和運行一個計算任務,并通過Job的一些方法對任務的參數進行相關的設置。此處設置了使用TokenizerMapper完成Map過程中的處理和使用IntSumReducer完成Combine和Reduce過程中的處理。還設置了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。任務的輸出和輸入路徑則由命令行參數指定,并由FileInputFormat和FileOutputFormat分別設定。完成相應任務的參數設定后,即可調用job.waitForCompletion()方法執行任務。

總結

以上是生活随笔為你收集整理的3、WordCount源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。