日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java mapreduce 实例_MapReduce -- JAVA 实例(一)计算总数

發布時間:2024/1/23 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java mapreduce 实例_MapReduce -- JAVA 实例(一)计算总数 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

MapReduce

===========================

將任務細化,讓不同的節點處理不同部分。處理完后,再把各自的結果進行統一。它通過鍵值對來處理數據。但鍵和值的類型都有要求。

通過JAVA編寫的 MapReduce 程序中。值的類型必須實現了 Writable, 因為它是要被寫入文件中的。而實現了

WritableComparable

接口的類,即可以是鍵也可以是值。因為它是可寫的,所以可以是值;而它又是可進行比較的,所以作為鍵。

一個可執行的 MapReduce JAVA 程序應該包括如下內容:

1.一個 main 方法,用來定義整個流程,接收的參數等。

2.定義 Mapper, 定義如何劃分數據生成的值也是鍵值對形式。

3.定義 Reducer, 定義如何處理結果。最后生成的值也是鍵值對形式。

4.定義一個 Job, 將 Mapper, Reducer 等必要的值設置進去。

MapReduce 通過操作 鍵/值 對來處理數據,一般形式是:

map: 將 (K1, V1) 的輸入轉化成 list(K2, V2) 的輸出

reduce: 將 (K2, list(V2)) 的輸入轉化成 list(K3, V3) 的輸出

比如,一個網絡游戲,有多個區,而角色又分幾個種族。這時要分析每個區每個種族分別有多少。

整個分析的輸入就是所有的這些數據,可能是數據庫數據,CVS 形式的數據表。

經過 Map 方法后,數據分析任務分配給不同的 DataNode 。每個 DataNode 上的數據可能是:A區 X 族:

5W 人; A區 Y 族: 6W 人; B 區 X 族 4W 人...

當然,這些數據是 鍵/值 對形式存儲的。

然后,這些數據再經過一個被稱為洗牌的過程,將不同種族的 鍵/值 對放到不同的 DataNode 上。

再經過 Reduce, 得到最終結果:X 族 8W; Y 族: 6W 當然,結果形式還是 鍵/值 對。

Mapper

-----------------------------------------------------

Mapper 的定義是由一個類來實現的。它必須繼承 MapReduceBase 基類并實現 Mapper 接口。

Mapper 只有一個方法:map.用于處理單獨的鍵值對。運行的時候鍵值對是由 Task

傳遞過來的。所以這里只需要定義如何處理就行,不用關心誰調用。

Reducer

-----------------------------------------------------

Reducer 的定義是由一個類來實現的。它必須繼承 MapReduceBase 基類并實現 Reducer

接口。

當 Reducer 任務接收來自各個 mapper

的輸出時,它按鍵值對中的鍵,對數據進行排序,并將相同的值的值歸并。然后調用 reduce() 方法。

案例:專利引用計算

************************************************************************

http://www.nber.org/patents/ 上有專利相關的數據。我們構造 MapReduce

程序來分析相關的結果。

選用該數據是因為該案例的數據結構和當前一些社會網絡圖差不多,數據形式較普遍。

專利引用數據集

http://www.nber.org/patents/acite75_99.zip

專利描述數據集:

http://www.nber.org/patents/apat63_99.zip

[root@localhost mapreduce1]# wget

http://www.nber.org/patents/acite75_99.zip

[root@localhost mapreduce1]# wget

http://www.nber.org/patents/apat63_99.zip

[root@localhost mapreduce1]# unzip

acite75_99.zip

[root@localhost mapreduce1]# unzip apat63_99.zip

[root@localhost mapreduce1]# cat cite75_99.txt | wc -l

16522439

[root@localhost mapreduce1]# cat apat63_99.txt | wc -l

2923923

專利引用數據有 16522439 條。專利描述有 2923923 條。

[root@localhost mapreduce1]# head -n 5

cite75_99.txt

"CITING","CITED"

3858241,956203

3858241,1324234

3858241,3398406

3858241,3557384

上面顯示的是專利之間的引用關系。每行表示一條數據。前面的數字是專業號,后面是被引用的專利號。所以上面的數據可以看到,3858241

引用了 其它的四個專利。當然后面還有很多數據,這其實就是一個多對多的關系。

[root@localhost mapreduce1]# head -n 5

apat63_99.txt

"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"

3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,

3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,

3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,

3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,

上面的數據是一個專利的描述。各字段分別是:PATENT 專利號,GYEAR 批準年,GDATE 批準日,APPYEAR

申請年,COUNTRY 第一發明人國家,POSTATE 第一發明人所在州(如果國家為美國),ASSIGNEE

專利權人,ASSCODE專 利權人類型,CLAIMS 聲明數目,NCLASS專利類型

需求:

解析各個專利被其它哪些專利引用了.

上傳要解析的文件到 HDFS:

[root@localhost sunyutest]# ../bin/hadoop fs -put

mapreduce1/cite75_99.txt .

查看是否上傳成功

[root@localhost sunyutest]# ../bin/hadoop fs -lsr .

-rw-r--r-- ?1 root supergroup

264075431 2013-08-19 12:21

/user/root/cite75_99.txt

-rw-r--r-- ?1 root supergroup

91014095 2013-08-15 14:44

/user/root/putmerg.txt

編寫代碼:

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.KeyValueTextInputFormat;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool {

public static class MapClass extends MapReduceBase

implements

Mapper {

@Override

public void map(Text key, Text value,

OutputCollector output, Reporter repoter)

throws IOException {

output.collect(value, key);

}

}

public static class Reduce extends MapReduceBase

implements

Reducer {

@Override

public void reduce(Text key, Iterator values,

OutputCollector output, Reporter repoter)

throws IOException {

String csv = "";

while (values.hasNext()) {

if (csv.length() > 0) {

csv += ",";

}

csv += values.next().toString();

}

output.collect(key, new Text(csv));

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf = getConf();

JobConf job = new JobConf(conf, MyJob.class);

Path in = new Path(args[0]);

Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);

FileOutputFormat.setOutputPath(job, out);

job.setJobName("MyJob");

job.setMapperClass(MapClass.class);

job.setReducerClass(Reduce.class);

job.setInputFormat(KeyValueTextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.set("key.value.separator.in.input.line", ",");

JobClient.runJob(job);

return 0;

}

public static void main(String[] args) throws Exception

{

int res = ToolRunner.run(new Configuration(), new MyJob(),

args);

System.exit(res);

}

}

編譯 JAVA 代碼

[root@localhost sunyutest]# javac -classpath

/home/hadoop/hadoop-core-1.2.0.jar -d mapreduce1/classes/

mapreduce1/src/MyJob.java

生成 JAR 包

[root@localhost sunyutest]# jar -cvf mapreduce1/mapreduce1.jar

-C mapreduce1/classes/ .

Hadoop 執行 JAR

[root@localhost sunyutest]# /home/hadoop/bin/./hadoop jar

mapreduce1/mapreduce1.jar MyJob /user/root/cite75_99.txt

/user/root/mapreduceresult1

過程:

13/08/19 12:23:50 INFO util.NativeCodeLoader: Loaded the

native-hadoop library

13/08/19 12:23:50 WARN snappy.LoadSnappy: Snappy native

library not loaded

13/08/19 12:23:50 INFO mapred.FileInputFormat: Total input

paths to process : 1

13/08/19 12:23:50 INFO mapred.JobClient: Running job:

job_201308151016_0003

13/08/19 12:23:51 INFO mapred.JobClient: ?map

0% reduce 0%

13/08/19 12:24:02 INFO mapred.JobClient: ?map

19% reduce 0%

13/08/19 12:24:24 INFO mapred.JobClient: ?map

62% reduce 0%

13/08/19 12:24:27 INFO mapred.JobClient: ?map

73% reduce 0%

13/08/19 12:24:30 INFO mapred.JobClient: ?map

86% reduce 16%

13/08/19 12:24:33 INFO mapred.JobClient: ?map

98% reduce 16%

13/08/19 12:24:36 INFO mapred.JobClient: ?map

100% reduce 16%

13/08/19 12:24:45 INFO mapred.JobClient: ?map

100% reduce 25%

13/08/19 12:25:01 INFO mapred.JobClient: ?map

100% reduce 97%

13/08/19 12:25:03 INFO mapred.JobClient: ?map

100% reduce 100%

13/08/19 12:25:04 INFO mapred.JobClient: Job complete:

job_201308151016_0003

13/08/19 12:25:04 INFO mapred.JobClient: Counters: 29

13/08/19 12:25:04 INFO mapred.JobClient: ?Job Counters

13/08/19 12:25:04 INFO mapred.JobClient: ?Launched reduce tasks=1

13/08/19 12:25:04 INFO mapred.JobClient: ?SLOTS_MILLIS_MAPS=94446

13/08/19 12:25:04 INFO mapred.JobClient: ?Total time spent by all reduces waiting after

reserving slots (ms)=0

13/08/19 12:25:04 INFO mapred.JobClient: ?Total time spent by all maps waiting after

reserving slots (ms)=0

13/08/19 12:25:04 INFO mapred.JobClient: ?Launched map tasks=4

13/08/19 12:25:04 INFO mapred.JobClient: ?Data-local map tasks=4

13/08/19 12:25:04 INFO mapred.JobClient: ?SLOTS_MILLIS_REDUCES=47599

13/08/19 12:25:04 INFO mapred.JobClient: ?File Input Format Counters

13/08/19 12:25:04 INFO mapred.JobClient: ?Bytes Read=264087719

13/08/19 12:25:04 INFO mapred.JobClient: ?File Output Format Counters

13/08/19 12:25:04 INFO mapred.JobClient: ?Bytes Written=0

13/08/19 12:25:04 INFO mapred.JobClient: ?FileSystemCounters

13/08/19 12:25:04 INFO mapred.JobClient: ?FILE_BYTES_READ=735648003

13/08/19 12:25:04 INFO mapred.JobClient: ?HDFS_BYTES_READ=264088111

13/08/19 12:25:04 INFO mapred.JobClient: ?FILE_BYTES_WRITTEN=1033040702

13/08/19 12:25:04 INFO mapred.JobClient: ?Map-Reduce Framework

13/08/19 12:25:04 INFO mapred.JobClient: ?Map output materialized bytes=297120333

13/08/19 12:25:04 INFO mapred.JobClient: ?Map input records=16522439

13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce shuffle bytes=297120333

13/08/19 12:25:04 INFO mapred.JobClient: ?Spilled Records=57431615

13/08/19 12:25:04 INFO mapred.JobClient: ?Map output bytes=264075431

13/08/19 12:25:04 INFO mapred.JobClient: ?Total committed heap usage

(bytes)=875757568

13/08/19 12:25:04 INFO mapred.JobClient: ?CPU time spent (ms)=104660

13/08/19 12:25:04 INFO mapred.JobClient: ?Map input bytes=264075431

13/08/19 12:25:04 INFO mapred.JobClient: ?SPLIT_RAW_BYTES=392

13/08/19 12:25:04 INFO mapred.JobClient: ?Combine input records=0

13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce input records=16522439

13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce input groups=3258984

13/08/19 12:25:04 INFO mapred.JobClient: ?Combine output records=0

13/08/19 12:25:04 INFO mapred.JobClient: ?Physical memory (bytes)

snapshot=1098424320

13/08/19 12:25:04 INFO mapred.JobClient: ?Reduce output records=0

13/08/19 12:25:04 INFO mapred.JobClient: ?Virtual memory (bytes) snapshot=5465493504

13/08/19 12:25:04 INFO mapred.JobClient: ?Map output records=16522439

執行完后再看HDFS中的文件:

[root@localhost sunyutest]# ../bin/hadoop fs -lsr

/user/root/mapreduceresult1/

-rw-r--r-- ?1 root supergroup

0 2013-08-19 12:40

/user/root/mapreduceresult1/_SUCCESS

drwxr-xr-x ?- root supergroup

0 2013-08-19 12:39

/user/root/mapreduceresult1/_logs

drwxr-xr-x ?- root supergroup

0 2013-08-19 12:39

/user/root/mapreduceresult1/_logs/history

-rw-r--r-- ?1 root supergroup

23426

2013-08-19 12:39

/user/root/mapreduceresult1/_logs/history/job_201308151016_0006_1376887156678_root_MyJob

-rw-r--r-- ?1 root supergroup

47406

2013-08-19 12:39

/user/root/mapreduceresult1/_logs/history/job_201308151016_0006_conf.xml

-rw-r--r-- ?1 root supergroup

158078539 2013-08-19 12:40

/user/root/mapreduceresult1/part-00000

發現結果文件 mapreduceresult1 已經有了,將它里面的

part-00000 下載到本地文件系統中:

[root@localhost sunyutest]# ../bin/hadoop fs -get

/user/root/mapreduceresult1/part-00000 .

查看前十行:

[root@localhost sunyutest]# head -n 10 part-00000

"CITED" "CITING"

1 3964859,4647229

10000 4539112

100000 5031388

1000006 4714284

1000007 4766693

1000011 5033339

1000017 3908629

1000026 4043055

1000033 4190903,4975983

可以看到,4190903, 4975983 兩個專利都引用了 1000033 專利。

這時,回過頭來看前面的 JAVA 代碼:

執行整個邏輯的類是 MyJob, Hadoop 要求 Mapper 和 Reducer 必須是它們自身的靜態類,所以在

MyJob 里面分別定義了這兩個類。而且將它定義成了MyJob 的內部類。這樣是為了簡單代碼,方便管理。

代碼邏輯中,核心代碼在 run() 中。它實例化一個 JobConf 對象,并通過

JobClient.runJob(job);啟動作業。實際上是 JobClient 和 JobTracker

通信,讓該作業在集群上啟動。JobConf 保存作業運行所需的全部配置參數。

Mapper 類的核心方法是 map(), Reducer 的是

reduce() 方法。

每個 map() 方法的調用都會傳入一個類型為 K1, V1 的鍵值對,它是由 mapper 生成,并通過

OutputCollector 對象的 collect() 方法來輸出,所以一定要調用:

output.collect((K2) k, (V2) v);

Reducer 中的 reduce() 方法的每次調用都被賦予

K2 類型的鍵以及 V2 類型的一組值。K2, V2 的類型必須和 mapper 中輸出的類型一樣。reduce() 可能會遍歷 V2

列表中的所有值:

while (values.hasNext()) {

if (csv.length() > 0) {

csv += ",";

}

csv += values.next().toString();

}

處理完后,reduce() 使用 OutputCollector 來輸出 K3/V3

的鍵/值對:output.collect(key, new Text(csv));

要注意的是 mapper 和 reducer

中使用的數據類型必須和定義 JobConf 時設置的一樣:

job.setInputFormat(KeyValueTextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

*************************************************************************

但從上面JAVA代碼可以看到,各類的參數,設定任務還是有些麻煩。

Hadoop 0.2

版本中,將許多類重寫了。

上面代碼中看到的,大多數類或接口都是在包 org.apache.hadoop.mapred 中。

新API將許多都移到了 org.apache.hadoop.mapreduce 中。而且增加了上下文對象

Context,用來 OutputCollector, 和 Reporter

對象。

通過 Context.write 來輸出結果,而不再是 OutputCollector.collect()。

另外,之前 Mapper 和 Reduce 都是接口,所以我們自定義的類都要 extends MapReduceBase

然后 implements Mapper 或者 Reducer。

現在在 org.apache.hadoop.mapreduce 中增加了 Mapper 和 Reducer

抽象類,我們自定義的類直接繼承它就行了。另外,拋出的異常,也由 IOException

變為 IOException 和 InterruptedException。

MyJob 的新API代碼是:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import

org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import

org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class MyJobNew extends Configured implements Tool

{

public static class MapClass extends

Mapper

Text> {

public void map(LongWritable key, Text value, Context

context)

throws IOException,

InterruptedException {

String[] citation = value.toString().split(",");

context.write(new Text(citation[1]), new

Text(citation[0]));

}

}

public static class Reduce extends

Reducer

Text> {

public void reduce(Text key,

Iterable values, Context

context)

throws IOException,

InterruptedException {

String csv = "";

for (Text val : values) {

if (csv.length() > 0) {

csv += ",";

}

csv += val.toString();

}

context.write(key, new Text(csv));

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf = getConf();

Job job = new Job(conf, "MyJobNew");

Path in = new Path(args[0]);

Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);

FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(MapClass.class);

job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

return 0;

}

public static void main(String[] args) throws Exception

{

int res = ToolRunner.run(new Configuration(), new MyJobNew(),

args);

System.exit(res);

}

}

總結

以上是生活随笔為你收集整理的java mapreduce 实例_MapReduce -- JAVA 实例(一)计算总数的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。