日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop 入门实例【转】

發(fā)布時間:2023/11/29 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop 入门实例【转】 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文鏈接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html

1、數(shù)據(jù)去重

  ?"數(shù)據(jù)去重"主要是為了掌握和利用并行化思想來對數(shù)據(jù)進行有意義篩選統(tǒng)計大數(shù)據(jù)集上的數(shù)據(jù)種類個數(shù)從網(wǎng)站日志中計算訪問地等這些看似龐雜的任務都會涉及數(shù)據(jù)去重。下面就進入這個實例的MapReduce程序設計。

1.1 實例描述

  對數(shù)據(jù)文件中的數(shù)據(jù)進行去重。數(shù)據(jù)文件中的每行都是一個數(shù)據(jù)。

  樣例輸入如下所示:

?????1)file1:

?

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

?

?????2)file2:

?

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

?

???? 樣例輸出如下所示:

?

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

?

1.2 設計思路

  數(shù)據(jù)去重最終目標是讓原始數(shù)據(jù)出現(xiàn)次數(shù)超過一次數(shù)據(jù)輸出文件只出現(xiàn)一次。我們自然而然會想到將同一個數(shù)據(jù)的所有記錄都交給一臺reduce機器,無論這個數(shù)據(jù)出現(xiàn)多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以數(shù)據(jù)作為key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key復制到輸出的key中,并將value設置成空值

  在MapReduce流程中,map的輸出<key,value>經(jīng)過shuffle過程聚集成<key,value-list>后會交給reduce。所以從設計好的reduce輸入可以反推出map的輸出key應為數(shù)據(jù),value任意。繼續(xù)反推,map輸出數(shù)據(jù)的key為數(shù)據(jù),而在這個實例中每個數(shù)據(jù)代表輸入文件中的一行內容,所以map階段要完成的任務就是在采用Hadoop默認的作業(yè)輸入方式之后,將value設置為key,并直接輸出(輸出中的value任意)。map中的結果經(jīng)過shuffle過程之后交給reduce。reduce階段不會管每個key有多少個value,它直接將輸入的key復制為輸出的key,并輸出就可以了(輸出中的value被設置成空了)。

1.3 程序代碼

???? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Dedup {

?

????//map將輸入中的value復制到輸出數(shù)據(jù)的key上,并直接輸出

????public?static?class?Map?extends?Mapper<Object,Text,Text,Text>{

????????private?static?Text?line=new?Text();//每行數(shù)據(jù)

???????

????????//實現(xiàn)map函數(shù)

????????public?void?map(Object key,Text value,Context context)

????????????????throws?IOException,InterruptedException{

????????????line=value;

??????????? context.write(line,?new?Text(""));

??????? }

???????

??? }

???

????//reduce將輸入中的key復制到輸出數(shù)據(jù)的key上,并直接輸出

????public?static?class?Reduce?extends?Reducer<Text,Text,Text,Text>{

????????//實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key,Iterable<Text> values,Context context)

????????????????throws?IOException,InterruptedException{

??????????? context.write(key,?new?Text(""));

??????? }

???????

??? }

???

????public?static?void?main(String[] args)?throws?Exception{

??????? Configuration conf =?new?Configuration();

????????//這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

???????

??????? String[] ioArgs=new?String[]{"dedup_in","dedup_out"};

???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

?????if?(otherArgs.length?!= 2) {

???? System.err.println("Usage: Data Deduplication <in> <out>");

???? System.exit(2);

???? }

?????

???? Job job =?new?Job(conf,?"Data Deduplication");

???? job.setJarByClass(Dedup.class);

?????

?????//設置Map、Combine和Reduce處理類

???? job.setMapperClass(Map.class);

???? job.setCombinerClass(Reduce.class);

???? job.setReducerClass(Reduce.class);

?????

?????//設置輸出類型

???? job.setOutputKeyClass(Text.class);

???? job.setOutputValueClass(Text.class);

?????

?????//設置輸入和輸出目錄

???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

???? System.exit(job.waitForCompletion(true) ? 0 : 1);

???? }

}

?

1.4 代碼結果

?????1)準備測試數(shù)據(jù)

???? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"dedup_in"文件夾(備注:"dedup_out"不需要創(chuàng)建。)如圖1.4-1所示,已經(jīng)成功創(chuàng)建。

???? ????

圖1.4-1 創(chuàng)建"dedup_in"?????????????? ??????????????????? 圖1.4.2 上傳"file*.txt"

?

??? ?然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/dedup_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖1.4-2所示,成功上傳之后。

???? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。

?

?

??? 查看兩個文件的內容如圖1.4-3所示:

?

圖1.4-3 文件"file*.txt"內容

2)查看運行結果

???? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"dedup_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。

?

圖1.4-4 運行結果

?

??? 此時,你可以對比一下和我們之前預期的結果是否一致。

2、數(shù)據(jù)排序

  "數(shù)據(jù)排序"是許多實際任務執(zhí)行時要完成的第一項工作,比如學生成績評比數(shù)據(jù)建立索引等。這個實例和數(shù)據(jù)去重類似,都是原始數(shù)據(jù)進行初步處理,為進一步的數(shù)據(jù)操作打好基礎。下面進入這個示例。

2.1 實例描述

??? 對輸入文件中數(shù)據(jù)進行排序。輸入文件中的每行內容均為一個數(shù)字即一個數(shù)據(jù)。要求在輸出中每行有兩個間隔的數(shù)字,其中,第一個代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次第二個代表原始數(shù)據(jù)

??? 樣例輸入

????1)file1:

?

2

32

654

32

15

756

65223

?

????2)file2:

?

5956

22

650

92

?

????3)file3:

?

26

54

6

?

??? 樣例輸出

?

1??? 2

2??? 6

3??? 15

4??? 22

5??? 26

6??? 32

7??? 32

8??? 54

9??? 92

10??? 650

11??? 654

12??? 756

13??? 5956

14??? 65223

?

2.2 設計思路

  這個實例僅僅要求對輸入數(shù)據(jù)進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個默認的排序,而不需要自己再實現(xiàn)具體的排序呢?答案是肯定的。

  但是在使用之前首先需要了解它的默認排序規(guī)則。它是按照key值進行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序對字符串排序。

  了解了這個細節(jié),我們就知道應該使用封裝int的IntWritable型數(shù)據(jù)結構了。也就是在map中將讀入的數(shù)據(jù)轉化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據(jù)value-list元素個數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個全局變量,它統(tǒng)計當前key的位次。需要注意的是這個程序中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經(jīng)能夠完成任務了。

2.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Sort {

?

????//map將輸入中的value化成IntWritable類型,作為輸出的key

????public?static?class?Map?extends

        Mapper<Object,Text,IntWritable,IntWritable>{

????????private?static?IntWritable?data=new?IntWritable();

???????

????????//實現(xiàn)map函數(shù)

????????public?void?map(Object key,Text value,Context context)

????????????????throws?IOException,InterruptedException{

??????????? String line=value.toString();

????????????data.set(Integer.parseInt(line));

??????????? context.write(data,?new?IntWritable(1));

??????? }

???????

??? }

???

????//reduce將輸入中的key復制到輸出數(shù)據(jù)的key上,

????//然后根據(jù)輸入的value-list中元素的個數(shù)決定key的輸出次數(shù)

????//用全局linenum來代表key的位次

????public?static?class?Reduce?extends

??????????? Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

???????

????????private?static?IntWritable?linenum?=?new?IntWritable(1);

???????

????????//實現(xiàn)reduce函數(shù)

????????public?void?reduce(IntWritable key,Iterable<IntWritable> values,Context context)

????????????????throws?IOException,InterruptedException{

????????????for(IntWritable?val:values){

??????????????? context.write(linenum, key);

????????????????linenum?=?new?IntWritable(linenum.get()+1);

??????????? }

???????????

??????? }

?

??? }

???

????public?static?void?main(String[] args)?throws?Exception{

??????? Configuration conf =?new?Configuration();

????????//這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

???????

??????? String[] ioArgs=new?String[]{"sort_in","sort_out"};

???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

?????if?(otherArgs.length?!= 2) {

???? System.err.println("Usage: Data Sort <in> <out>");

???????? System.exit(2);

???? }

?????

???? Job job =?new?Job(conf,?"Data Sort");

???? job.setJarByClass(Sort.class);

?????

?????//設置Map和Reduce處理類

???? job.setMapperClass(Map.class);

???? job.setReducerClass(Reduce.class);

?????

?????//設置輸出類型

???? job.setOutputKeyClass(IntWritable.class);

???? job.setOutputValueClass(IntWritable.class);

?????

?????//設置輸入和輸出目錄

???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

???? System.exit(job.waitForCompletion(true) ? 0 : 1);

???? }

}

?

2.4 代碼結果

1)準備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"sort_in"文件夾(備注:"sort_out"不需要創(chuàng)建。)如圖2.4-1所示,已經(jīng)成功創(chuàng)建。

??????????????

圖2.4-1 創(chuàng)建"sort_in"????????????????????????????????????????????????? 圖2.4.2 上傳"file*.txt"

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/sort_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖2.4-2所示,成功上傳之后。

??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。

?

?

查看兩個文件的內容如圖2.4-3所示:

?

圖2.4-3 文件"file*.txt"內容

2)查看運行結果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"sort_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。

?

圖2.4-4 運行結果

3、平均成績

??? "平均成績"主要目的還是在重溫經(jīng)典"WordCount"例子,可以說是在基礎上的微變化版,該實例主要就是實現(xiàn)一個計算學生平均成績的例子。

3.1 實例描述

  對輸入文件中數(shù)據(jù)進行就算學生平均成績。輸入文件中的每行內容均為一個學生姓名和他相應的成績,如果有多門學科,則每門學科為一個文件。要求在輸出中每行有兩個間隔的數(shù)據(jù),其中,第一個代表學生的姓名第二個代表其平均成績

??? 樣本輸入

????1)math:

?

張三??? 88

李四??? 99

王五??? 66

趙六??? 77

?

????2)china:

?

張三??? 78

李四??? 89

王五??? 96

趙六??? 67

?

????3)english:

?

張三??? 80

李四??? 82

王五??? 84

趙六??? 86

?

??? 樣本輸出

?

張三??? 82

李四??? 90

王五??? 82

趙六??? 76

?

3.2 設計思路

??? 計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發(fā)MapReduce程序的流程。程序包括兩部分的內容:Map部分和Reduce部分,分別實現(xiàn)了map和reduce的功能。

????Map處理的是一個純文本文件,文件中存放的數(shù)據(jù)時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的數(shù)據(jù)是由InputFormat分解過的數(shù)據(jù)集,其中InputFormat的作用是將數(shù)據(jù)集切割成小數(shù)據(jù)集InputSplit,每一個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現(xiàn),并將一個InputSplit解析成<key,value>對提供給了map函數(shù)。InputFormat的默認值是TextInputFormat,它針對文本文件,按行將文本切割成InputSlit,并用LineRecordReader將InputSplit解析成<key,value>對,key是行在文本中的位置,value是文件中的一行。

??? Map的結果會通過partion分發(fā)到Reducer,Reducer做完Reduce操作后,將通過以格式OutputFormat輸出。

??? Mapper最終處理的結果對<key,value>,會送到Reducer中進行合并,合并的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有用戶定制Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到文件中。

3.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.Iterator;

import?java.util.StringTokenizer;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.LongWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Score {

?

????public?static?class?Map?extends

??????????? Mapper<LongWritable, Text, Text, IntWritable> {

?

????????//?實現(xiàn)map函數(shù)

????????public?void?map(LongWritable?key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

????????????//?將輸入的純文本文件的數(shù)據(jù)轉化成String

??????????? String line = value.toString();

?

????????????//?將輸入的數(shù)據(jù)首先按行進行分割

????????????StringTokenizer tokenizerArticle =?new?StringTokenizer(line,?"\n");

?

????????????//?分別對每一行進行處理

????????????while?(tokenizerArticle.hasMoreElements()) {

????????????????//?每行按空格劃分

??????????????? StringTokenizer tokenizerLine =?new?StringTokenizer(tokenizerArticle.nextToken());

?

??????????????? String strName = tokenizerLine.nextToken();//?學生姓名部分

??????????????? String strScore = tokenizerLine.nextToken();//?成績部分

?

??????????????? Text name =?new?Text(strName);

????????????????int?scoreInt = Integer.parseInt(strScore);

????????????????//?輸出姓名和成績

??????????????? context.write(name,?new?IntWritable(scoreInt));

??????????? }

??????? }

?

??? }

?

????public?static?class?Reduce?extends

??????????? Reducer<Text, IntWritable, Text, IntWritable> {

????????//?實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<IntWritable> values,

??????????????? Context context)?throws?IOException, InterruptedException {

?

????????????int?sum = 0;

????????????int?count = 0;

?

??????????? Iterator<IntWritable> iterator = values.iterator();

????????????while?(iterator.hasNext()) {

??????????????? sum += iterator.next().get();//?計算總分

??????????????? count++;//?統(tǒng)計總的科目數(shù)

??????????? }

?

????????????int?average = (int) sum / count;//?計算平均成績

??????????? context.write(key,?new?IntWritable(average));

??????? }

?

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"score_in",?"score_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Score Average <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Score Average");

??????? job.setJarByClass(Score.class);

?

????????//?設置Map、Combine和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setCombinerClass(Reduce.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(IntWritable.class);

?

????????//?將輸入的數(shù)據(jù)集分割成小數(shù)據(jù)塊splites,提供一個RecordReder的實現(xiàn)

??????? job.setInputFormatClass(TextInputFormat.class);

????????//?提供一個RecordWriter的實現(xiàn),負責數(shù)據(jù)輸出

??????? job.setOutputFormatClass(TextOutputFormat.class);

?

????????//?設置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

3.4 代碼結果

1)準備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"score_in"文件夾(備注:"score_out"不需要創(chuàng)建。)如圖3.4-1所示,已經(jīng)成功創(chuàng)建。

?

????? ???? ?

圖3.4-1 創(chuàng)建"score_in"????????????????????????????????????????????????????? ?圖3.4.2 上傳三門分數(shù)

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/score_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖3.4-2所示,成功上傳之后。

????備注:文本文件的編碼為"UTF-8",默認為"ANSI",可以另存為時選擇,不然中文會出現(xiàn)亂碼

??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。

?

?

查看三個文件的內容如圖3.4-3所示:

?

圖3.4.3 三門成績的內容

2)查看運行結果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"score_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖3.4-4所示。

?

圖3.4-4 運行結果

4、單表關聯(lián)

??? 前面的實例都是在數(shù)據(jù)上進行一些簡單的處理,為進一步的操作打基礎。"單表關聯(lián)"這個實例要求給出的數(shù)據(jù)尋找關心的數(shù)據(jù),它是對原始數(shù)據(jù)所包含信息的挖掘。下面進入這個實例。

4.1 實例描述

??? 實例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。

??? 樣例輸入如下所示。

????file:

?

child??????? parent

Tom??????? Lucy

Tom??????? Jack

Jone??????? Lucy

Jone??????? Jack

Lucy??????? Mary

Lucy??????? Ben

Jack??????? Alice

Jack??????? Jesse

Terry??????? Alice

Terry??????? Jesse

Philip??????? Terry

Philip??????? Alma

Mark??????? Terry

Mark??????? Alma

?

??? 家族樹狀關系譜:

?

?

圖4.2-1 家族譜

??? 樣例輸出如下所示。

????file:

?

grandchild??????? grandparent

Tom???????????   Alice

Tom???????????   Jesse

Jone???????????   Alice

Jone???????????   Jesse

Tom???????????   Mary

Tom???????????   Ben

Jone???????????   Mary

Jone???????????   Ben

Philip??????????  ? Alice

Philip???????????   Jesse

Mark???????????   Alice

Mark???????????   Jesse

?

4.2 設計思路

?????? 分析這個實例,顯然需要進行單表連接,連接的是左表parent列和右表child列,且左表右表同一個表

  連接結果除去連接的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現(xiàn)自連接其次就是連接列設置最后結果整理

????? 考慮到MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結果的key設置成待連接,然后列中相同的值就自然會連接在一起了。再與最開始的分析聯(lián)系起來:

  要連接的是左表的parent列和右表的child列,且左表和右表是同一個表,所以在map階段讀入數(shù)據(jù)分割childparent之后,會將parent設置成keychild設置成value進行輸出,并作為左表;再將同一對childparent中的child設置成keyparent設置成value進行輸出,作為右表。為了區(qū)分輸出中的左右表,需要在輸出的value加上左右表信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表。這樣在map的結果中就形成了左表和右表,然后在shuffle過程中完成連接。reduce接收到連接的結果,其中每個key的value-list就包含了"grandchild--grandparent"關系。取出每個key的value-list進行解析,將左表中的child放入一個數(shù)組右表中的parent放入一個數(shù)組,然后對兩個數(shù)組求笛卡爾積就是最后的結果了。

4.3 程序代碼

??? 程序代碼如下所示。

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.*;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?STjoin {

?

????public?static?int?time?= 0;

?

????/*

???? * map將輸出分割child和parent,然后正序輸出一次作為右表,

???? *?反序輸出一次作為左表,需要注意的是在輸出的value中必須

???? *?加上左右表的區(qū)別標識。

???? */

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????//?實現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

??????????? String childname =?new?String();//?孩子名稱

??????????? String parentname =?new?String();//?父母名稱

??????????? String relationtype =?new?String();//?左右表標識

?

????????????//?輸入的一行預處理文本

??????????? StringTokenizer itr=new?StringTokenizer(value.toString());

??????????? String[] values=new?String[2];

????????????int?i=0;

????????????while(itr.hasMoreTokens()){

??????????????? values[i]=itr.nextToken();

??????????????? i++;

??????????? }

???????????

????????????if?(values[0].compareTo("child") != 0) {

??????????????? childname = values[0];

??????????????? parentname = values[1];

?

????????????????//?輸出左表

??????????????? relationtype =?"1";

??????????????? context.write(new?Text(values[1]),?new?Text(relationtype +

????????????????????????"+"+ childname +?"+"?+ parentname));

?

????????????????//?輸出右表

??????????????? relationtype =?"2";

??????????????? context.write(new?Text(values[0]),?new?Text(relationtype +

????????????????????????"+"+ childname +?"+"?+ parentname));

??????????? }

??????? }

?

??? }

?

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????//?實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?輸出表頭

????????????if?(0 ==?time) {

????????????????context.write(new?Text("grandchild"),?new?Text("grandparent"));

????????????????time++;

??????????? }

?

????????????int?grandchildnum = 0;

??????????? String[] grandchild =?new?String[10];

????????????int?grandparentnum = 0;

??????????? String[] grandparent =?new?String[10];

?

????????????Iterator?ite = values.iterator();

????????????while?(ite.hasNext()) {

??????????????? String record = ite.next().toString();

????????????????int?len = record.length();

????????????????int?i = 2;

????????????????if?(0 == len) {

????????????????????continue;

??????????????? }

?

????????????????//?取得左右表標識

????????????????char?relationtype = record.charAt(0);

????????????????//?定義孩子和父母變量

??????????????? String childname =?new?String();

??????????????? String parentname =?new?String();

?

????????????????//?獲取value-list中value的child

????????????????while?(record.charAt(i) !=?'+') {

??????????????????? childname += record.charAt(i);

??????????????????? i++;

??????????????? }

?

??????????????? i = i + 1;

?

????????????????//?獲取value-list中value的parent

????????????????while?(i < len) {

??????????????????? parentname += record.charAt(i);

??????????????????? i++;

??????????????? }

?

????????????????//?左表,取出child放入grandchildren

????????????????if?('1'?== relationtype) {

??????????????????? grandchild[grandchildnum] = childname;

??????????????????? grandchildnum++;

??????????????? }

?

????????????????//?右表,取出parent放入grandparent

????????????????if?('2'?== relationtype) {

??????????????????? grandparent[grandparentnum] = parentname;

??????????????????? grandparentnum++;

??????????????? }

??????????? }

?

????????????//?grandchild和grandparent數(shù)組求笛卡爾兒積

????????????if?(0 != grandchildnum && 0 != grandparentnum) {

????????????????for?(int?m = 0; m < grandchildnum; m++) {

????????????????????for?(int?n = 0; n < grandparentnum; n++) {

????????????????????????//?輸出結果

??????????????????????? context.write(new?Text(grandchild[m]),?new?Text(grandparent[n]));

??????????????????? }

??????????????? }

??????????? }

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"STjoin_in",?"STjoin_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Single Table Join <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Single Table Join");

??????? job.setJarByClass(STjoin.class);

?

????????//?設置Map和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

4.4 代碼結果

1)準備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"STjoin_in"文件夾(備注:"STjoin_out"不需要創(chuàng)建。)如圖4.4-1所示,已經(jīng)成功創(chuàng)建。

?

???????????????? ?

圖4.4-1 創(chuàng)建"STjoin_in"?????????????????????????????????????? 圖4.4.2 上傳"child-parent"表

?

??? 然后在本地建立一個txt文件,通過Eclipse上傳到"/user/hadoop/STjoin_in"文件夾中,一個txt文件的內容如"實例描述"那個文件一樣。如圖4.4-2所示,成功上傳之后。

??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的文件,顯示其內容如圖4.4-3所示:

?

圖4.4-3 表"child-parent"內容

????2)運行詳解

????(1)Map處理:

??? map函數(shù)輸出結果如下所示。

?

child??????? parent????????????????àà??????????????????? 忽略此行

Tom??????? Lucy???????????????????àà??????????????? <Lucy,1+Tom+Lucy>

???????????????????????????????????????????         <Tom,2+Tom+Lucy >

Tom??????? Jack????????????????????àà??????????????? <Jack,1+Tom+Jack>

???????????????????????????????????????????         <Tom,2+Tom+Jack>

Jone??????? Lucy???????????????  àà??????????????? <Lucy,1+Jone+Lucy>

???????????????????????????????????????????         <Jone,2+Jone+Lucy>

Jone??????? Jack????????????????????àà??????????????? <Jack,1+Jone+Jack>

???????????????????????????????????????????         <Jone,2+Jone+Jack>

Lucy??????? Mary???????????????????àà??????????????? <Mary,1+Lucy+Mary>

???????????????????????????????????????????         <Lucy,2+Lucy+Mary>

Lucy??????? Ben????????????????????àà??????????????? <Ben,1+Lucy+Ben>

???????????????????????????????????????????          <Lucy,2+Lucy+Ben>

Jack??????? Alice????????????????????àà??????????????? <Alice,1+Jack+Alice>

???????????????????????????????????????????           <Jack,2+Jack+Alice>

Jack??????? Jesse???????????????????àà??????????????? <Jesse,1+Jack+Jesse>

???????????????????????????????????????????           <Jack,2+Jack+Jesse>

Terry??????? Alice???????????????????àà??????????????? <Alice,1+Terry+Alice>

???????????????????????????????????????????           <Terry,2+Terry+Alice>

Terry??????? Jesse??????????????????àà??????????????? <Jesse,1+Terry+Jesse>

???????????????????????????????????????????           <Terry,2+Terry+Jesse>

Philip??????? Terry??????????????????àà??????????????? <Terry,1+Philip+Terry>

???????????????????????????????????????????           <Philip,2+Philip+Terry>

Philip??????? Alma???????????????????àà??????????????? <Alma,1+Philip+Alma>

???????????????????????????????????????????           <Philip,2+Philip+Alma>

Mark??????? Terry???????????????????àà??????????????? <Terry,1+Mark+Terry>

???????????????????????????????????????????           <Mark,2+Mark+Terry>

Mark??????? Alma???????????????  àà??????????????? <Alma,1+Mark+Alma>

???????????????????????????????????????????           <Mark,2+Mark+Alma>

?

????(2)Shuffle處理

??? 在shuffle過程中完成連接。

?

map函數(shù)輸出

排序結果

shuffle連接

<Lucy,1+Tom+Lucy>

<Tom,2+Tom+Lucy>

<Jack,1+Tom+Jack>

<Tom,2+Tom+Jack>

<Lucy,1+Jone+Lucy>

<Jone,2+Jone+Lucy>

<Jack,1+Jone+Jack>

<Jone,2+Jone+Jack>

<Mary,1+Lucy+Mary>

<Lucy,2+Lucy+Mary>

<Ben,1+Lucy+Ben>

<Lucy,2+Lucy+Ben>

<Alice,1+Jack+Alice>

<Jack,2+Jack+Alice>

<Jesse,1+Jack+Jesse>

<Jack,2+Jack+Jesse>

<Alice,1+Terry+Alice>

<Terry,2+Terry+Alice>

<Jesse,1+Terry+Jesse>

<Terry,2+Terry+Jesse>

<Terry,1+Philip+Terry>

<Philip,2+Philip+Terry>

<Alma,1+Philip+Alma>

<Philip,2+Philip+Alma>

<Terry,1+Mark+Terry>

<Mark,2+Mark+Terry>

<Alma,1+Mark+Alma>

<Mark,2+Mark+Alma>

<Alice,1+Jack+Alice>

<Alice,1+Terry+Alice>

<Alma,1+Philip+Alma>

<Alma,1+Mark+Alma>

<Ben,1+Lucy+Ben>

<Jack,1+Tom+Jack>

<Jack,1+Jone+Jack>

<Jack,2+Jack+Alice>

<Jack,2+Jack+Jesse>

<Jesse,1+Jack+Jesse>

<Jesse,1+Terry+Jesse>

<Jone,2+Jone+Lucy>

<Jone,2+Jone+Jack>

<Lucy,1+Tom+Lucy>

<Lucy,1+Jone+Lucy>

<Lucy,2+Lucy+Mary>

<Lucy,2+Lucy+Ben>

<Mary,1+Lucy+Mary>

<Mark,2+Mark+Terry>

<Mark,2+Mark+Alma>

<Philip,2+Philip+Terry>

<Philip,2+Philip+Alma>

<Terry,2+Terry+Alice>

<Terry,2+Terry+Jesse>

<Terry,1+Philip+Terry>

<Terry,1+Mark+Terry>

<Tom,2+Tom+Lucy>

<Tom,2+Tom+Jack>

<Alice,1+Jack+Alice,

??????? 1+Terry+Alice?,

??????? 1+Philip+Alma,

??????? 1+Mark+Alma >

<Ben,1+Lucy+Ben>

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

<Jesse,1+Jack+Jesse,

??????? 1+Terry+Jesse >

<Jone,2+Jone+Lucy,

??????? 2+Jone+Jack>

<Lucy,1+Tom+Lucy,

??????? 1+Jone+Lucy,

??????? 2+Lucy+Mary,

??????? 2+Lucy+Ben>

<Mary,1+Lucy+Mary,

??????? 2+Mark+Terry,

??????? 2+Mark+Alma>

<Philip,2+Philip+Terry,

??????? 2+Philip+Alma>

<Terry,2+Terry+Alice,

??????? 2+Terry+Jesse,

??????? 1+Philip+Terry,

??????? 1+Mark+Terry>

<Tom,2+Tom+Lucy,

??????? 2+Tom+Jack>

?

????(3)Reduce處理

????首先由語句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中沒有左表或者右表,則不會做處理,可以根據(jù)這條規(guī)則去除無效shuffle連接

?

無效的shuffle連接

有效的shuffle連接

<Alice,1+Jack+Alice,

??????? 1+Terry+Alice?,

??????? 1+Philip+Alma,

??????? 1+Mark+Alma >

<Ben,1+Lucy+Ben>

<Jesse,1+Jack+Jesse,

??????? 1+Terry+Jesse >

<Jone,2+Jone+Lucy,

??????? 2+Jone+Jack>

<Mary,1+Lucy+Mary,

??????? 2+Mark+Terry,

??????? 2+Mark+Alma>

<Philip,2+Philip+Terry,

??????? 2+Philip+Alma>

<Tom,2+Tom+Lucy,

??????? 2+Tom+Jack>

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

<Lucy,1+Tom+Lucy,

??????? 1+Jone+Lucy,

??????? 2+Lucy+Mary,

??????? 2+Lucy+Ben>

<Terry,2+Terry+Alice,

??????? 2+Terry+Jesse,

??????? 1+Philip+Terry,

??????? 1+Mark+Terry>

??? 然后根據(jù)下面語句進一步對有效的shuffle連接做處理。

?

// 左表,取出child放入grandchildren

if ('1' == relationtype) {

??? grandchild[grandchildnum] = childname;

??? grandchildnum++;

}

?

// 右表,取出parent放入grandparent

if ('2' == relationtype) {

??? grandparent[grandparentnum] = parentname;

??? grandparentnum++;

}

?

??? 針對一條數(shù)據(jù)進行分析:

?

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

?

????分析結果左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表與右表連接鍵。而"value-list"表示以"key"連接左表與右表相關數(shù)據(jù)

??? 根據(jù)上面針對左表與右表不同的處理規(guī)則,取得兩個數(shù)組的數(shù)據(jù)如下所示:

?

grandchild

Tom、Jone(grandchild[grandchildnum] = childname;)

grandparent

Alice、Jesse(grandparent[grandparentnum] = parentname;)

????

??? 然后根據(jù)下面語句進行處理。

?

for (int m = 0; m < grandchildnum; m++) {

??? for (int n = 0; n < grandparentnum; n++) {

??????? context.write(new Text(grandchild[m]), new Text(grandparent[n]));

??? }

}

?

??

?

處理結果如下面所示:

?

Tom??????? Jesse

Tom??????? Alice

Jone??????? Jesse

Jone??????? Alice?

??? 其他的有效shuffle連接處理都是如此

3)查看運行結果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"STjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖4.4-4所示。

?

圖4.4-4 運行結果

5、多表關聯(lián)

????多表關聯(lián)和單表關聯(lián)類似,它也是通過對原始數(shù)據(jù)進行一定的處理,從其中挖掘出關心的信息。下面進入這個實例。

5.1 實例描述

??? 輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數(shù)據(jù)中找出工廠名地址名對應關系,輸出"工廠名——地址名"表。

??? 樣例輸入如下所示。

????1)factory:

?

factoryname???????????????     addressed

Beijing Red Star???????????????     1

Shenzhen Thunder???????????     3

Guangzhou Honda???????????     2

Beijing Rising??????????????????     1

Guangzhou Development Bank??????2

Tencent???????????????         3

Back of Beijing???????????????      1

?

????2)address:

?

addressID??? addressname

1???????     Beijing

2???????     Guangzhou

3???????     Shenzhen

4???????     Xian

?

??? 樣例輸出如下所示。

?

factoryname???????????????????     addressname

Back of Beijing???????????????????     ? Beijing

Beijing Red Star???????????????????     Beijing

Beijing Rising???????????????????       Beijing

Guangzhou Development Bank??????????Guangzhou

Guangzhou Honda???????????????     Guangzhou

Shenzhen Thunder???????????????     Shenzhen

Tencent???????????????????         Shenzhen

?

5.2 設計思路

??? 多表關聯(lián)和單表關聯(lián)相似,都類似于數(shù)據(jù)庫中的自然連接。相比單表關聯(lián),多表關聯(lián)的左右表和連接列更加清楚。所以可以采用和單表關聯(lián)的相同處理方式,map識別出輸入的行屬于哪個表之后,對其進行分割,將連接的列值保存在key中,另一列和左右表標識保存在value中,然后輸出。reduce拿到連接結果之后,解析value內容,根據(jù)標志將左右表內容分開存放,然后求笛卡爾積,最后直接輸出。

??? 這個實例的具體分析參考單表關聯(lián)實例。下面給出代碼。

5.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.*;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?MTjoin {

?

????public?static?int?time?= 0;

?

????/*

???? *?在map中先區(qū)分輸入行屬于左表還是右表,然后對兩列值進行分割,

???? *?保存連接列在key值,剩余列和左右表標志在value中,最后輸出

???? */

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????//?實現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

??????????? String line = value.toString();//?每行文件

??????????? String relationtype =?new?String();//?左右表標識

?

????????????//?輸入文件首行,不處理

????????????if?(line.contains("factoryname") ==?true

??????????????????? || line.contains("addressed") ==?true) {

????????????????return;

??????????? }

?

????????????//?輸入的一行預處理文本

??????????? StringTokenizer itr =?new?StringTokenizer(line);

??????????? String mapkey =?new?String();

??????????? String mapvalue =?new?String();

????????????int?i = 0;

????????????while?(itr.hasMoreTokens()) {

????????????????//?先讀取一個單詞

??????????????? String token = itr.nextToken();

????????????????//?判斷該地址ID就把存到"values[0]"

????????????????if?(token.charAt(0) >=?'0'?&& token.charAt(0) <=?'9') {

??????????????????? mapkey = token;

????????????????????if?(i > 0) {

??????????????????????? relationtype =?"1";

??????????????????? }?else?{

??????????????????????? relationtype =?"2";

??????????????????? }

????????????????????continue;

??????????????? }

?

????????????????//?存工廠名

??????????????? mapvalue += token +?" ";

??????????????? i++;

??????????? }

?

????????????//?輸出左右表

??????????? context.write(new?Text(mapkey),?new?Text(relationtype +?"+"+ mapvalue));

??????? }

??? }

?

????/*

???? * reduce解析map輸出,將value中數(shù)據(jù)按照左右表分別保存,

  *?然后求出笛卡爾積,并輸出。

???? */

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????//?實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?輸出表頭

????????????if?(0 ==?time) {

????????????????context.write(new?Text("factoryname"),?new?Text("addressname"));

????????????????time++;

??????????? }

?

????????????int?factorynum = 0;

??????????? String[] factory =?new?String[10];

????????????int?addressnum = 0;

??????????? String[]?address?=?new?String[10];

?

????????????Iterator?ite = values.iterator();

????????????while?(ite.hasNext()) {

??????????????? String record = ite.next().toString();

????????????????int?len = record.length();

????????????????int?i = 2;

????????????????if?(0 == len) {

????????????????????continue;

??????????????? }

?

????????????????//?取得左右表標識

????????????????char?relationtype = record.charAt(0);

?

????????????????//?左表

????????????????if?('1'?== relationtype) {

??????????????????? factory[factorynum] = record.substring(i);

??????????????????? factorynum++;

??????????????? }

?

????????????????//?右表

????????????????if?('2'?== relationtype) {

????????????????????address[addressnum] = record.substring(i);

??????????????????? addressnum++;

??????????????? }

??????????? }

?

????????????//?求笛卡爾積

????????????if?(0 != factorynum && 0 != addressnum) {

????????????????for?(int?m = 0; m < factorynum; m++) {

????????????????????for?(int?n = 0; n < addressnum; n++) {

????????????????????????//?輸出結果

??????????????????????? context.write(new?Text(factory[m]),

????????????????????????????????new?Text(address[n]));

??????????????????? }

??????????????? }

??????????? }

?

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"MTjoin_in",?"MTjoin_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Multiple Table Join <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Multiple Table Join");

??????? job.setJarByClass(MTjoin.class);

?

????????//?設置Map和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

5.4 代碼結果

1)準備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"MTjoin_in"文件夾(備注:"MTjoin_out"不需要創(chuàng)建。)如圖5.4-1所示,已經(jīng)成功創(chuàng)建。

?

???????? ?????? ?

圖5.4-1 創(chuàng)建"MTjoin_in"??????????????????????????????????????????????????????????? ?圖5.4.2 上傳兩個數(shù)據(jù)表

?

??? 然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/MTjoin_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖5.4-2所示,成功上傳之后。

??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。

?

圖5.4.3 兩個數(shù)據(jù)表的內容

2)查看運行結果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"MTjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖5.4-4所示。

?

圖5.4-4 運行結果

6、倒排索引

??? "倒排索引"是文檔檢索系統(tǒng)最常用數(shù)據(jù)結構,被廣泛地應用于全文搜索引擎。它主要是用來存儲某個單詞(或詞組)在一個文檔或一組文檔中的存儲位置映射,即提供了一種根據(jù)內容來查找文檔方式。由于不是根據(jù)文檔來確定文檔所包含的內容,而是進行相反的操作,因而稱為倒排索引(Inverted Index)。

6.1 實例描述

??? 通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的ID號,或者是指文檔所在位置的URL,如圖6.1-1所示。

?

圖6.1-1 倒排索引結構

??? 從圖6.1-1可以看出,單詞1出現(xiàn)在{文檔1,文檔4,文檔13,……}中,單詞2出現(xiàn)在{文檔3,文檔5,文檔15,……}中,而單詞3出現(xiàn)在{文檔1,文檔8,文檔20,……}中。在實際應用中,還需要每個文檔添加一個權值,用來指出每個文檔與搜索內容的相關度,如圖6.1-2所示。

?

?

圖6.1-2 添加權重的倒排索引

??? 最常用的是使用詞頻作為權重,即記錄單詞在文檔中出現(xiàn)的次數(shù)。以英文為例,如圖6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"這個單詞在文本T0中出現(xiàn)過1次,T1中出現(xiàn)過1次,T2中出現(xiàn)過2次。當搜索條件為"MapReduce"、"is"、"Simple"時,對應的集合為:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文檔T0和T1包含了所要索引的單詞,而且只有T0是連續(xù)的。

?

?

圖6.1-3 倒排索引示例

??? 更復雜的權重還可能要記錄單詞在多少個文檔中出現(xiàn)過,以實現(xiàn)TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考慮單詞在文檔中的位置信息(單詞是否出現(xiàn)在標題中,反映了單詞在文檔中的重要性)等。

??? 樣例輸入如下所示。

????1)file1:

?

MapReduce is simple

?

????2)file2:

?

MapReduce is powerful is simple

?

????3)file3:

?

Hello MapReduce bye MapReduce

?

??? 樣例輸出如下所示。

?

MapReduce????? file1.txt:1;file2.txt:1;file3.txt:2;

is???????     file1.txt:1;file2.txt:2;

simple???????  ? file1.txt:1;file2.txt:1;

powerful???   file2.txt:1;

Hello???????   file3.txt:1;

bye???????  ?? file3.txt:1;

?

6.2 設計思路

??? 實現(xiàn)"倒排索引"只要關注的信息為:單詞文檔URL詞頻,如圖3-11所示。但是在實現(xiàn)過程中,索引文件的格式與圖6.1-3會略有所不同,以避免重寫OutPutFormat類。下面根據(jù)MapReduce的處理過程給出倒排索引設計思路

????1)Map過程

??? 首先使用默認的TextInputFormat類對輸入文件進行處理,得到文本中每行偏移量及其內容。顯然,Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個信息:單詞、文檔URL和詞頻,如圖6.2-1所示。

?

圖6.2-1 Map過程輸入/輸出

?

  這里存在兩個問題第一,<key,value>對只能有兩個值,在不使用Hadoop自定義數(shù)據(jù)類型的情況下,需要根據(jù)情況將其中兩個值合并成一個值,作為key或value值;第二,通過一個Reduce過程無法同時完成詞頻統(tǒng)計生成文檔列表,所以必須增加一個Combine過程完成詞頻統(tǒng)計

??? 這里講單詞和URL組成key值(如"MapReduce:file1.txt"),將詞頻作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文檔相同單詞詞頻組成列表,傳遞給Combine過程,實現(xiàn)類似于WordCount的功能。

????2)Combine過程

??? 經(jīng)過map方法處理后,Combine過程將key值相同的value值累加,得到一個單詞在文檔在文檔中的詞頻,如圖6.2-2所示。如果直接將圖6.2-2所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值無法保證這一點,所以必須修改key值和value值。這次將單詞作為key值,URL和詞頻組value值(如"file1.txt:1")。這樣做的好處是可以利用MapReduce框架默認的HashPartitioner類完成Shuffle過程,將相同單詞所有記錄發(fā)送給同一個Reducer進行處理

?

?

圖6.2-2 Combine過程輸入/輸出

????3)Reduce過程

??? 經(jīng)過上述兩個過程后,Reduce過程只需將相同key值的value值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給MapReduce框架進行處理了。如圖6.2-3所示。索引文件的內容除分隔符外與圖6.1-3解釋相同。

????4)需要解決的問題

??? 本實例設計的倒排索引在文件數(shù)目沒有限制,但是單詞文件不宜過大(具體值與默認HDFS塊大小及相關配置有關),要保證每個文件對應一個split。否則,由于Reduce過程沒有進一步統(tǒng)計詞頻,最終結果可能出現(xiàn)詞頻未統(tǒng)計完全單詞。可以通過重寫InputFormat類將每個文件為一個split,避免上述情況。或者執(zhí)行兩次MapReduce第一次MapReduce用于統(tǒng)計詞頻第二次MapReduce用于生成倒排索引。除此之外,還可以利用復合鍵值對等實現(xiàn)包含更多信息的倒排索引。

?

?

圖6.2-3 Reduce過程輸入/輸出

6.3 程序代碼

  程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.StringTokenizer;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.input.FileSplit;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?InvertedIndex {

?

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????private?Text?keyInfo?=?new?Text();?//?存儲單詞和URL組合

????????private?Text?valueInfo?=?new?Text();?//?存儲詞頻

????????private?FileSplit?split;?//?存儲Split對象

?

????????//?實現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?獲得<key,value>對所屬的FileSplit對象

????????????split?= (FileSplit) context.getInputSplit();

?

??????????? StringTokenizer itr =?new?StringTokenizer(value.toString());

?

????????????while?(itr.hasMoreTokens()) {

????????????????// key值由單詞和URL組成,如"MapReduce:file1.txt"

????????????????//?獲取文件的完整路徑

????????????????//?keyInfo.set(itr.nextToken()+":"+split.getPath().toString());

????????????????//?這里為了好看,只獲取文件的名稱。

????????????????int?splitIndex =?split.getPath().toString().indexOf("file");

????????????????keyInfo.set(itr.nextToken() +?":"

??????????????????? +?split.getPath().toString().substring(splitIndex));

????????????????//?詞頻初始化為1

????????????????valueInfo.set("1");

?

??????????????? context.write(keyInfo,?valueInfo);

??????????? }

??????? }

??? }

?

????public?static?class?Combine?extends?Reducer<Text, Text, Text, Text> {

?

????????private?Text?info?=?new?Text();

?

????????//?實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?統(tǒng)計詞頻

????????????int?sum = 0;

????????????for?(Text value : values) {

??????????????? sum += Integer.parseInt(value.toString());

??????????? }

?

????????????int?splitIndex = key.toString().indexOf(":");

????????????//?重新設置value值由URL和詞頻組成

????????????info.set(key.toString().substring(splitIndex + 1) +?":"?+ sum);

????????????//?重新設置key值為單詞

??????????? key.set(key.toString().substring(0, splitIndex));

?

??????????? context.write(key,?info);

??????? }

??? }

?

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????private?Text?result?=?new?Text();

?

????????//?實現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?生成文檔列表

??????????? String fileList =?new?String();

????????????for?(Text value : values) {

??????????????? fileList += value.toString() +?";";

??????????? }

?

????????????result.set(fileList);

?

??????????? context.write(key,?result);

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"index_in",?"index_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs)

??????????????? .getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Inverted Index <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Inverted Index");

??????? job.setJarByClass(InvertedIndex.class);

?

????????//?設置Map、Combine和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setCombinerClass(Combine.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設置Map輸出類型

??????? job.setMapOutputKeyClass(Text.class);

??????? job.setMapOutputValueClass(Text.class);

?

????????//?設置Reduce輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

6.4 代碼結果

1)準備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"index_in"文件夾(備注:"index_out"不需要創(chuàng)建。)如圖6.4-1所示,已經(jīng)成功創(chuàng)建。

?

????????????????

圖6.4-1 創(chuàng)建"index_in"???????????????????????????????????????????? 圖6.4.2 上傳"file*.txt"

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/index_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖6.4-2所示,成功上傳之后。

??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。

?

圖6.4.3 三個"file*.txt"的內容

2)查看運行結果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發(fā)現(xiàn)多出一個"index_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖6.4-4所示。

?

圖6.4-4 運行結果

?

?

  文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.9.rar

總結

以上是生活随笔為你收集整理的hadoop 入门实例【转】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。

久久最新网址 | 国产精品久久久久久模特 | 色婷婷天天干 | 国产在线精品区 | 四虎在线观看视频 | av福利在线看 | 成人影片在线播放 | 2020天天干天天操 | www.黄色小说.com | 欧美日韩成人一区 | 美女视频黄色免费 | 久久综合九色综合97婷婷女人 | 91人人在线| 日韩视频免费观看高清完整版在线 | 久久精品国产成人 | 欧美福利视频一区 | 久久亚洲福利视频 | 亚洲资源片 | 亚洲一区视频在线播放 | 在线视频观看91 | 成人在线免费视频观看 | 久久高清国产视频 | 人人草人人做 | 久草资源在线 | 丁香综合网 | 在线观看一区视频 | 国内精品久久久久久久久久久久 | 欧美最新另类人妖 | 欧美日韩一区二区视频在线观看 | 亚洲一级影院 | 国产日韩欧美自拍 | 国内精品久久久 | 夜夜躁狠狠躁 | 久艹视频免费观看 | 91超碰在线播放 | 99热手机在线观看 | 一区二区精品在线观看 | 亚洲成人家庭影院 | 亚洲少妇久久 | 黄色一级大片在线免费看国产一 | 免费国产视频 | 国产精品不卡一区 | 韩日精品在线 | 美女视频是黄的免费观看 | 亚洲国产三级在线观看 | av黄色在线播放 | 亚洲精品国产精品国自产观看 | 91视频午夜| 中文字幕国产 | 国产小视频你懂的 | 国产精品岛国久久久久久久久红粉 | 超碰在线观看97 | 亚洲少妇xxxx | 色激情五月 | 中文视频在线播放 | 亚洲综合激情 | 国产精品一区二区三区久久 | 日本久久电影网 | 狠狠色丁香婷婷综合久久片 | 午夜电影久久 | 日韩黄色av网站 | 欧美成人猛片 | 久草视频在线免费 | 久久精品久久99精品久久 | 久久香蕉一区 | 日韩欧美高清视频在线观看 | 国产无区一区二区三麻豆 | av不卡在线看 | 国产精品九色 | 精品二区久久 | 久久国产精品久久精品国产演员表 | 久av在线 | 亚洲深夜影院 | 波多野结衣久久资源 | 在线观看中文字幕一区 | 色天堂在线视频 | 久久精品国产成人精品 | 天天综合网久久 | 天天色棕合合合合合合 | 国产精品成人久久久 | 一区二区三区精品久久久 | 国产二区视频在线 | 国产精品毛片一区二区在线 | 国产精品视频免费看 | 国产精品久久久久久高潮 | 国产成人三级一区二区在线观看一 | 国产精品毛片一区视频播 | 日韩精品视频在线观看网址 | 国产精品久久久久永久免费看 | 日本精品中文字幕 | 日韩欧美有码在线 | 97视频在线观看成人 | 日韩电影在线视频 | 成年人国产精品 | 久久人人爽人人爽人人片av免费 | 免费91麻豆精品国产自产在线观看 | 伊人日日干 | 色综合 久久精品 | av中文字幕免费在线观看 | 亚洲最新av网站 | 色偷偷88欧美精品久久久 | caobi视频| 91在线小视频 | 极品美女被弄高潮视频网站 | 久久久久久久久久影院 | 中文字幕中文字幕 | 蜜臀aⅴ精品一区二区三区 久久视屏网 | 国产视频精品免费播放 | 999抗病毒口服液 | 麻豆视频免费版 | 亚洲国产中文字幕 | 久久精品久久久久久久 | 国产69精品久久久久99尤 | 色婷婷综合五月 | 国产精品一区二区免费视频 | 国产精品入口麻豆www | 国产亚洲精品成人av久久影院 | 国产高清视频 | 91av影视 | 麻豆视频在线 | 91精品国产99久久久久久久 | 91精品第一页 | 成人久久精品视频 | 日韩av手机在线看 | 日日夜夜综合 | 99免费在线观看视频 | 欧美激情精品久久久久久免费 | 亚洲乱码中文字幕综合 | 国产美腿白丝袜足在线av | 国产精品18p | 欧美国产三区 | 91视频首页 | 色婷婷福利视频 | 精品一区二区免费在线观看 | 一区二区三区影院 | 在线观看亚洲专区 | 深夜福利视频在线观看 | 国产资源免费在线观看 | 91精品国产一区 | 亚洲无人区小视频 | 国产网红在线观看 | 天天操夜夜操国产精品 | 中文字幕久久精品一区 | 亚洲四虎在线 | 亚洲精品久久久久久国 | 国产精品成人a免费观看 | 日韩精品最新在线观看 | 综合天堂av久久久久久久 | 亚洲h色精品 | 国产成人在线综合 | 日韩精品中文字幕在线观看 | 国产一线二线三线性视频 | 99久久爱 | 色噜噜日韩精品一区二区三区视频 | 亚洲在线黄色 | 欧美色精品天天在线观看视频 | 久久五月婷婷丁香 | 天天操天天干天天玩 | 成片免费观看视频 | 丁香六月婷婷开心 | 亚洲理论片 | 日韩欧美在线观看一区二区三区 | 国产r级在线观看 | 九九免费观看全部免费视频 | 成人精品久久久 | 欧美激情视频一二区 | 深夜福利视频一区二区 | 国产人免费人成免费视频 | 国产精品成人av在线 | 成人久久亚洲 | 91精品国产一区二区在线观看 | 日b视频在线观看网址 | 成人一级在线观看 | 免费观看www小视频的软件 | 麻豆一精品传二传媒短视频 | japanese黑人亚洲人4k | 香蕉视频在线免费 | 一区 在线观看 | 99麻豆视频 | 黄色一级大片在线免费看国产一 | 中文字幕一区二区三区四区久久 | 日本aa在线 | 久久免费视频播放 | 国产一区二区不卡视频 | 国产精品高清在线 | 欧美淫视频| 黄色免费网 | 欧美精彩视频 | 99久久99久久综合 | 欧美日韩性生活 | 99精品视频在线播放免费 | 伊人久久国产 | 国产成人精品av | 狠狠色丁香婷婷综合久久片 | 国产1级毛片 | 亚洲精品久久久蜜臀下载官网 | 欧美视频日韩 | 国产精品久久久一区二区 | 西西人体4444www高清视频 | 国产精彩在线视频 | 91免费观看视频网站 | 日韩视频免费观看高清完整版在线 | 日韩高清不卡一区二区三区 | 麻豆免费在线播放 | 国产黄色精品在线 | 日韩欧美视频在线播放 | 久久久久久99精品 | 国产h在线播放 | 夜色.com | 日日干干 | 欧美性做爰猛烈叫床潮 | 国产精品久久久久久一二三四五 | 亚洲天天干 | 精品视频免费久久久看 | 欧美日韩免费看 | 婷婷视频 | 99草视频| 国产精品剧情在线亚洲 | 美女视频网站久久 | 亚洲视频 中文字幕 | 国产黄色免费观看 | 日韩在线观看不卡 | 狠狠久久综合 | 天天综合网~永久入口 | 九9热这里真品2 | 欧美日韩在线视频一区 | 黄色在线网站噜噜噜 | www.久久91 | 精品视频久久久 | 久久免费视频在线 | 91在线视频观看免费 | 91视频免费网站 | 欧美午夜精品久久久久久孕妇 | 天堂av在线网 | 综合黄色网 | 国产免费不卡av | 98精品国产自产在线观看 | 日日躁夜夜躁aaaaxxxx | 国产视频 亚洲视频 | 国产精品va在线播放 | 日日夜夜网站 | 成人黄色国产 | 精品一区三区 | 特级毛片在线 | h文在线观看免费 | 99热精品在线 | 日韩三区在线观看 | 三级黄色在线观看 | 五月婷婷在线综合 | 日韩在线视频网 | 激情深爱 | 亚洲永久国产精品 | 91亚洲免费 | 天天综合成人 | 在线观看国产一区二区 | 精品av在线播放 | 超碰人人91| 亚洲一级片免费观看 | 亚洲在线日韩 | 亚洲精品乱码久久久久久按摩 | 四虎亚洲精品 | av在线进入| 在线看片91 | 亚洲人在线7777777精品 | 亚洲最大av| 免费久草视频 | 国产成人精品一区二区三区网站观看 | 成人中文字幕av | 中文字幕av电影下载 | 国产亚洲日 | 国产一区二区午夜 | 国色天香在线观看 | 国产一区电影在线观看 | 免费黄a | 综合婷婷久久 | 99热这里| 国产福利一区二区三区在线观看 | 成人一区二区在线 | 国产精品一区二区久久精品爱微奶 | 777奇米四色 | 久久爱992xxoo | 日韩免费高清 | av国产在线观看 | 亚洲成a人片综合在线 | 亚洲视频aaa | 天天爽天天做 | 欧美性生活一级片 | 色999视频| 欧美午夜精品久久久久久孕妇 | 日韩字幕在线 | 又黄又刺激又爽的视频 | 波多野结衣在线视频一区 | 综合天堂av久久久久久久 | 最新黄色av网址 | 欧洲精品二区 | 91看片淫黄大片一级在线观看 | 中文字幕av专区 | 久久人人爽人人爽人人片 | 久草视频一区 | 天天操夜夜做 | 欧美精品免费在线 | 久久成人综合 | 韩国av一区 | 欧美精品在线观看免费 | sm免费xx网站| 国产精品18久久久久vr手机版特色 | 国产精品自产拍在线观看蜜 | 国产小视频在线免费观看 | 国产无区一区二区三麻豆 | 97超碰人人澡人人爱 | 久久综合国产伦精品免费 | 在线黄色观看 | 91九色在线视频观看 | 国产精品久久99综合免费观看尤物 | 丁香电影小说免费视频观看 | 亚洲在线网址 | 日韩黄色一区 | 免费a视频 | 国产欧美综合在线观看 | 国产专区精品视频 | 九九九九热精品免费视频点播观看 | 国产码电影 | 亚洲最大激情中文字幕 | 亚洲视频久久 | 日韩欧美综合在线视频 | 一区二区三区福利 | 97精品免费视频 | 天堂入口网站 | 人人爱人人射 | 免费精品视频在线 | 久久久久99精品成人片三人毛片 | 97av在线| 99久久电影 | 亚洲精品乱码久久久久久写真 | 国产精品久久久久久久久久妇女 | 久久久蜜桃一区二区 | 国产在线精品二区 | 开心色婷婷 | 亚洲91中文字幕无线码三区 | 国产精品免费在线视频 | 99视频一区二区 | 欧美日韩一区三区 | a视频免费在线观看 | www日韩视频 | bbbbb女女女女女bbbbb国产 | 国产精品久久久久一区 | 精品xxx | 4p变态网欧美系列 | 色婷婷激情电影 | 色资源网免费观看视频 | 中文字幕在线看视频 | 国产高清视频在线 | 婷婷国产在线观看 | 亚洲综合情 | 国产一区二区三区 在线 | 国产第一二区 | 亚洲国产精品va在线看黑人动漫 | 亚洲 欧美 变态 国产 另类 | 亚洲精品国产精品国自产观看 | 日本在线观看黄色 | 午夜影院在线观看18 | 日韩激情片在线观看 | 国产精品99久久久久久武松影视 | 欧美日韩视频免费看 | 久久久久久久久久久高潮一区二区 | 天天操综合 | 中文字幕在线观看网址 | 精品国产1区二区 | 国产精品综合久久久久久 | 国内精品久久久久影院一蜜桃 | 日韩激情视频在线 | 亚洲jizzjizz日本少妇 | 久久综合久久久 | 欧美成年人在线观看 | 国产精品久久久视频 | 在线免费黄色片 | 婷婷伊人综合亚洲综合网 | 国产不卡免费 | 韩国三级一区 | 日本三级国产 | 91中文字幕一区 | 麻豆 videos| 91精品国产综合久久久久久久 | 波多野结衣理论片 | 最新国产在线 | 久久久久久久久久久久久国产精品 | www久久国产 | 国产高清免费视频 | 成人国产精品 | 日韩精品欧美视频 | 亚洲一区视频免费观看 | 丁五月婷婷 | 91精品久久久久久久91蜜桃 | 九色琪琪久久综合网天天 | 成人黄色短片 | 夜夜爽夜夜操 | 丁香婷婷射| 97夜夜澡人人爽人人免费 | 十八岁免进欧美 | 亚洲午夜久久久综合37日本 | 毛片视频网址 | 97理论电影 | 国产91粉嫩白浆在线观看 | 精品国产一区二区三区免费 | 国产一区二区精品久久 | 亚洲国产成人在线播放 | 91视频在线免费观看 | 免费三级影片 | 久在线观看 | 韩日电影在线观看 | 国产精品成人国产乱一区 | 成人午夜精品久久久久久久3d | 99爱国产精品| 久久成人在线视频 | 成人黄色国产 | 天天操网 | 91香蕉视频好色先生 | 久久人人爽视频 | 国产高清日韩欧美 | 国产精品久久久久久久久费观看 | 人人射av | 久久蜜臀av | www.成人精品 | 久热色超碰 | 久操综合| 五月天堂色 | 99精品视频网站 | 五月在线视频 | 国产在线永久 | 久久老司机精品视频 | 久99久中文字幕在线 | 一区二区三区免费 | 丝袜护士aⅴ在线白丝护士 天天综合精品 | 国产成人1区| 黄色免费大全 | 久久久精品二区 | 欧美成人xxxx | 精品在线播放视频 | 国产中文在线字幕 | 黄色91免费观看 | 亚洲综合视频在线播放 | av电影一区 | 在线免费黄色 | 国产原创在线 | 欧美一区二区在线免费观看 | 午夜影院一级片 | 亚洲精品在线观看视频 | 毛片黄色一级 | 国产精品中文字幕在线 | 2022久久国产露脸精品国产 | 久久精品小视频 | 美女视频黄是免费的 | 久久久久久久久艹 | 国产乱码精品一区二区蜜臀 | a亚洲视频 | 国产成人精品国内自产拍免费看 | 国产一级片播放 | 国产精品免费在线视频 | 日韩系列 | 亚洲日本中文字幕在线观看 | 一区二区视频在线观看免费 | 国产美女视频黄a视频免费 久久综合九色欧美综合狠狠 | 欧洲av不卡| 中文字幕一区二区三区在线播放 | 97色婷婷成人综合在线观看 | 午夜精品视频福利 | av青草| 欧美极品久久 | 久久综合天天 | 正在播放国产一区二区 | 手机看片福利 | 在线免费观看黄色 | 色999精品 | 日韩中文字幕亚洲一区二区va在线 | 成人午夜片av在线看 | a久久久久| 国产精品久久电影观看 | 在线播放视频一区 | 成年人免费在线 | 国产精品久久久久久久久软件 | 中文字幕日韩免费视频 | 国产又粗又猛又色 | 日韩精品不卡在线 | 99麻豆久久久国产精品免费 | 天天干天天草天天爽 | 国产精品久久久久久婷婷天堂 | 亚洲天天综合网 | 999久久a精品合区久久久 | 国产a视频免费观看 | 婷婷中文在线 | 国产精品破处视频 | 911香蕉 | 久久成人精品电影 | 天天天天色射综合 | 免费观看国产精品 | 最近中文字幕国语免费av | 日韩在线观看一区二区三区 | 经典三级一区 | 久久xxxx| 精品久久久久久久久久岛国gif | 97精品一区二区三区 | 夜夜操天天 | 国产色小视频 | 天天操天天射天天舔 | 四虎永久免费在线观看 | 91精品国产高清 | 久草在线视频国产 | 日韩欧美大片免费观看 | 99热官网| 久久av一区二区三区亚洲 | 婷婷六月综合网 | 亚洲视频axxx| 波多野结衣一区二区 | 首页中文字幕 | a色视频 | 久久久精品国产一区二区电影四季 | 亚洲视频999 | 五月天堂网 | 久久久久中文 | 91中文在线 | 免费在线播放黄色 | 国产精品一区在线观看 | 国产精品入口传媒 | 亚洲国产丝袜在线观看 | 97视频在线免费 | 日日干日日 | 在线中文字幕网站 | 国产一区二区三区高清播放 | 久久综合激情 | 亚洲精品视频网站在线观看 | av.com在线 | 天天操夜夜逼 | 久草在线免| 日韩色综合| 亚洲精品在线视频播放 | 免费在线观看av | 91精品综合| 欧美 亚洲 另类 激情 另类 | 婷婷在线免费 | www.97色.com | av中文在线| 在线观看韩日电影免费 | 久久久影视 | 久久国产成人午夜av影院潦草 | 精品一区在线 | 成人免费视频网站 | 久久久久欠精品国产毛片国产毛生 | 久久成人国产精品一区二区 | av网站手机在线观看 | 日本在线观看一区二区 | 91在线观看高清 | 91激情视频在线播放 | 中文字幕在线免费看 | 天天爱天天操 | 最近中文字幕大全 | 婷婷伊人网 | 91资源在线视频 | 999国产精品视频 | 日本性高潮视频 | 天天干天天操人体 | 精品亚洲免费视频 | 亚州av网站大全 | 91视频在线播放视频 | 国产成在线观看免费视频 | 五月丁香 | 欧美日韩中文在线视频 | 97热久久免费频精品99 | 在线视频18在线视频4k | 黄色大片入口 | 国产一级二级在线观看 | 成片人卡1卡2卡3手机免费看 | 永久免费毛片 | 五月婷婷综合久久 | 日韩精品中文字幕在线观看 | 91精品在线观看视频 | 91视频免费观看 | 在线观看视频福利 | 99精品视频一区 | 在线观看v片 | 黄色片网站av | 国产精品美女久久久久久免费 | 日韩资源在线观看 | 国产91学生粉嫩喷水 | 精品一区欧美 | 免费看成人av | 在线高清av| 成人动漫精品一区二区 | 欧美日韩国产色综合一二三四 | 亚洲国产成人精品电影在线观看 | 久草免费在线 | 日本三级国产 | 手机看片国产日韩 | 国产一级视屏 | 国内精品久久久精品电影院 | 97偷拍在线视频 | 国产精品2018| 色婷婷av在线 | 天天在线视频色 | 激情影音 | 亚洲人成网站精品片在线观看 | 国产精品ⅴa有声小说 | 国产一区二区在线视频观看 | 久久精品欧美一区二区三区麻豆 | 天天综合网在线观看 | 欧美怡红院视频 | 日本电影久久 | 日韩在线播放视频 | av大片免费在线观看 | 国产高清视频网 | 国产在线一线 | 一区二区三区精品在线 | 久久成人国产精品免费软件 | 波多野结衣电影一区二区三区 | 久久字幕网 | 久久黄色片子 | 久草电影在线观看 | 中文字幕在线观看视频一区二区三区 | 2019国产精品 | 在线视频区 | 一区 二区电影免费在线观看 | 日韩高清不卡在线 | 亚州精品视频 | 久久久久国产一区二区三区四区 | 欧美大香线蕉线伊人久久 | 免费在线成人av | 婷婷色综| 国产精品毛片一区二区在线看 | 欧美 日韩精品 | 夜夜骑天天操 | 日韩视频中文字幕在线观看 | 国产一级大片在线观看 | 深夜免费福利视频 | 亚洲欧美成aⅴ人在线观看 四虎在线观看 | 亚洲国产中文字幕在线 | 奇米7777狠狠狠琪琪视频 | 国产二区视频在线 | 成人黄色在线视频 | 91视频 - v11av | 日韩在线观看高清 | 4438全国亚洲精品在线观看视频 | 国产精品毛片一区二区三区 | 精品五月天 | 国产精品成久久久久 | 日韩成人免费在线电影 | 国产黄色av影视 | 精品国产乱码久久久久久久 | 欧美在线观看视频一区二区三区 | 亚洲国产精品va在线看黑人动漫 | 成年人在线看视频 | 精品亚洲男同gayvideo网站 | 午夜影视剧场 | 久久免费高清视频 | 91丝袜美腿 | 婷婷久久综合九色综合 | 亚洲最大激情中文字幕 | 一本一本久久a久久 | 久久免费视频4 | 国产精品欧美久久久久天天影视 | 国产免费成人av | 麻豆影视在线免费观看 | 中文字幕中文字幕在线中文字幕三区 | 黄色成人在线观看 | 免费99精品国产自在在线 | 91成人免费在线视频 | 在线天堂视频 | 毛片网在线观看 | 中文字幕在线播放一区 | 中文字幕第一页av | 久久69av| 天堂激情网 | 国内精品亚洲 | 免费在线观看av电影 | 欧美在线观看视频一区二区三区 | 91久久精| 成人午夜精品 | 永久免费精品视频 | 黄色av播放| 国产成人99av超碰超爽 | 欧美永久视频 | 在线视频 成人 | 久艹视频在线免费观看 | 五月天欧美精品 | 日韩在线观看中文字幕 | 国产高清在线a视频大全 | 西西人体4444www高清视频 | 亚洲精品在线免费播放 | 我要色综合天天 | 免费观看www7722午夜电影 | 天天干天天操av | 日韩欧美亚州 | 在线看欧美 | 五月天亚洲激情 | 欧美亚洲xxx | 黄色大片免费网站 | 亚洲三级av | 97碰视频| 久久激情视频 久久 | 国产黄在线免费观看 | 国产高清在线看 | 精品久久一区二区三区 | 在线视频观看91 | 中文在线免费一区三区 | 精品久久一区二区 | 国产青春久久久国产毛片 | av在线小说 | 久久综合九色综合久久久精品综合 | 日日色综合| 成 人 黄 色 片 在线播放 | 又污又黄网站 | 日韩欧美精品在线 | 中文字幕亚洲综合久久五月天色无吗'' | 久久久免费观看视频 | 亚洲人片在线观看 | 久久久久国产精品午夜一区 | 久久精选 | 国内精品在线一区 | 国产精品综合久久久久 | 91丨九色丨首页 | 最近日本中文字幕 | 久久久久亚洲精品男人的天堂 | av在线精品 | 国产不卡在线视频 | 国产亚洲精品久久网站 | 国产在线观看a | 在线观看免费一级片 | 久久在线观看视频 | 欧美三级高清 | 亚洲欧美视频在线播放 | 亚洲精品激情 | 日韩免费观看一区二区 | 美女视频黄是免费的 | 亚洲人成精品久久久久 | 亚洲欧洲国产日韩精品 | 日韩欧美电影在线 | 丁香五婷| 午夜av在线免费 | 国产一区二区在线播放 | 欧美日韩激情视频8区 | 成人免费观看完整版电影 | 日韩69av| 天堂av网址| 一区二区精品视频 | 国产日韩在线一区 | 成人综合婷婷国产精品久久免费 | 中文视频在线 | 国产在线国偷精品产拍 | 在线看中文字幕 | 亚洲aⅴ一区二区三区 | 日本一区二区三区视频在线播放 | av日韩中文| 日韩一二三区不卡 | 久久久免费电影 | 亚洲va在线va天堂va偷拍 | 探花视频在线观看免费版 | 中文字幕在线视频一区二区 | www.97视频 | 精品免费| 久草在线观看资源 | 久久人人爽人人爽 | av色网站| 日本黄色免费大片 | 欧美性色综合网站 | 欧美一级艳片视频免费观看 | 久久久久久精 | 亚洲乱码在线观看 | 玖玖玖在线 | av一级二级 | 国产一二三精品 | 国产色婷婷在线 | 色综合天天天天做夜夜夜夜做 | 婷婷丁香色 | 午夜手机看片 | 操操操人人人 | 国产 字幕 制服 中文 在线 | 久草视频在线免费看 | 黄色一级大片在线观看 | 蜜臀av性久久久久蜜臀aⅴ四虎 | 国产午夜精品理论片在线 | 久草av在线播放 | 久久99操| 亚洲免费av片 | 五月天久久精品 | 色婷婷久久一区二区 | 日韩高清不卡一区二区三区 | 91精品久久久久久久久久入口 | 久久99久久99久久 | 在线成人免费av | 日韩av免费在线看 | 国产高清视频在线播放 | 91在线免费播放 | 9在线观看免费高清完整版 玖玖爱免费视频 | 中日韩免费视频 | 激情www| 欧美a在线看 | 91网免费看 | av大片网站 | 亚洲午夜av久久乱码 | 国产成人在线观看 | av中文字幕日韩 | 精久久久久 | 91视频免费播放 | 欧美亚洲精品在线观看 | 午夜视频免费播放 | 日本动漫做毛片一区二区 | 婷婷色综 | 日韩大片在线 | 国产精品久久久久一区二区 | 91麻豆精品国产91久久久久久久久 | 99热在| 色网免费观看 | 黄色av电影一级片 | 日本xxxx.com | 国产第一页福利影院 | 五月天电影免费在线观看一区 | 国产欧美久久久精品影院 | 免费高清在线观看成人 | 国产亚洲精品久久久久久大师 | 久久综合色天天久久综合图片 | 天天射天天搞 | 日韩久久久久 | 人人射网站 | 亚洲综合色播 | 玖玖精品视频 | 免费av的网站 | 96精品高清视频在线观看软件特色 | 婷婷综合五月天 | 国产精品久久久久av免费 | 欧美色图另类 | 超碰97中文| 婷婷综合久久 | 在线免费观看国产精品 | 久草 | 免费视频黄色 | 99色国产 | 久久国产免费 | 97超级碰碰碰碰久久久久 | 成人天堂网 | 国产毛片久久久 | 日韩在线免费高清视频 | 日韩av中文字幕在线 | 91影视成人 | 欧美国产日韩一区 | 亚洲免费在线播放视频 | 免费情缘 | 在线精品国产 | 婷婷综合网| av天天草| 日日插日日干 | 午夜在线免费视频 | 国产日韩欧美在线看 | 丁香六月天 | 怡红院久久| 亚洲精品乱码久久久久久9色 | www.激情五月.com | 国产美腿白丝袜足在线av | 91av在线视频免费观看 | 一区二区国产精品 | 免费看片黄色 | 精品伊人久久久 | 免费观看一区二区 | 国产不卡网站 | 日日爽日日操 | 韩国精品在线 | 天天操天天色综合 | 久久精品99国产精品日本 | se视频网址| 五月天亚洲激情 | 伊香蕉大综综综合久久啪 | 在线天堂中文在线资源网 | 精品国产99国产精品 | 日批在线观看 | 欧美巨大 | a级成人毛片 | 国内成人精品2018免费看 | 国产视频日韩视频欧美视频 | 久久国产精品久久w女人spa | 国产一线在线 | 欧美精品一二 | 久久视频在线 | 天天激情综合网 | 韩日视频在线 | 国产九九热 | 97精品在线| 国产美女永久免费 | 91精品国产91久久久久 | 中文字幕制服丝袜av久久 | 黄色国产在线观看 | 国产xxxx做受性欧美88 | 日本少妇高清做爰视频 | 欧美a√大片 | 国精产品永久999 | 亚洲成人av一区 | 欧美巨大荫蒂茸毛毛人妖 | 夜色成人av | 国产在线观看xxx | 久久成人午夜 | 欧美人操人| 国产又粗又猛又爽又黄的视频先 | 香蕉视频亚洲 | 日韩欧美电影在线观看 | 五月开心色 | 久久麻豆精品 | 日韩在线观看视频一区二区三区 | 超碰在线94 | 狠狠狠色丁香婷婷综合久久88 | 国精产品999国精产品岳 | 精品国产1区二区 | 国产婷婷 | 91网在线 | 精品视频免费播放 | 国产一级黄大片 | 国产精品成人自产拍在线观看 | 亚洲欧洲日韩在线观看 | 久青草电影 | 国产丝袜制服在线 | 日本深夜福利视频 | 欧美一区二区三区在线观看 | 国产精品一区二区三区四区在线观看 | 国产一级精品绿帽视频 | 成人网页在线免费观看 | 丁香婷婷色综合亚洲电影 | 国产亚洲人成网站在线观看 | 97视频免费在线观看 | 天天色天天操综合网 | 午夜精品一区二区三区在线播放 | 在线中文字幕视频 | 国产精品一区二区三区四区在线观看 | 91九色丨porny丨丰满6 | 最近2019好看的中文字幕免费 | 99久热在线精品视频观看 | 91精品1区 | 亚洲精品国久久99热 | 精品欧美一区二区三区久久久 | 久久精品欧美 | 国产一卡久久电影永久 | 国语对白少妇爽91 | 久久免费国产视频 | 婷婷久久五月天 | 国产在线播放一区二区 | 亚洲国产中文字幕 | 欧美日韩另类视频 | 亚色视频在线观看 | 国产九九热 | 亚洲欧美成人综合 | 久久99久国产精品黄毛片入口 | 日韩精品一区在线播放 | 超级碰碰碰视频 | 免费视频xnxx com | 国产精品久久麻豆 | 亚洲精品中文字幕视频 | 国产精品久久99综合免费观看尤物 | 天天色天天射天天综合网 | 91视频91蝌蚪| 国产精品美女久久久久久2018 | 日韩色在线 | 亚洲婷婷伊人 | 国产一二三四在线观看视频 | 亚洲精品久久久久久久不卡四虎 | 黄色av高清 | www.888.av| 久久久久久久国产精品视频 | 色精品视频 | 国产精品99蜜臀久久不卡二区 | 日韩黄色大片在线观看 | 国产精品18久久久久久久网站 | 久久99久久精品国产 | 91精品视频一区 | 色偷偷男人的天堂av | 91成人在线观看喷潮 | 亚洲码国产日韩欧美高潮在线播放 | 天堂av高清 | 成人黄色电影在线观看 | 91久久精| 天天草天天插 | 亚洲综合成人婷婷小说 | 日本午夜在线亚洲.国产 | 久久精品久久99精品久久 | 成年人免费电影在线观看 | 成人一级片在线观看 | 在线精品视频免费播放 | 久久国产精品一二三区 | 日韩精品久久久久久中文字幕8 | 中文字幕在线免费看 | 四虎天堂 | 福利视频午夜 | 97精品国产aⅴ | 国产91精品久久久久久 | 成人免费 在线播放 | 亚洲首页 | 亚洲精品国产精品乱码不99热 | 亚洲 中文 欧美 日韩vr 在线 | 国产在线97 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 射射射av| 2019免费中文字幕 | 日日操操| 中文字幕影片免费在线观看 | 在线免费观看国产精品 |