日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

MapReduce数据连接

發(fā)布時(shí)間:2025/6/15 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce数据连接 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

對(duì)于不同文件里的數(shù)據(jù),有時(shí)候有相應(yīng)關(guān)系,須要進(jìn)行連接(join),獲得一個(gè)新的文件以便進(jìn)行分析。比方有兩個(gè)輸入文件a.txt,b.txt,當(dāng)中的數(shù)據(jù)格式分別例如以下

1 a 2 b 3 c 4 d
1 good 2 bad 3 ok 4 hello
須要將其連接成一個(gè)新的例如以下的文件:

a good b bad c ok d hello
處理步驟能夠分成兩步:

1.map階段,將兩個(gè)輸入文件里的數(shù)據(jù)進(jìn)行打散,例如以下:

1 a 1 good 2 b 2 bad 3 c 3 ok 4 d 4 hello
2.reduce階段,進(jìn)行數(shù)據(jù)的連接操作,此處數(shù)據(jù)較簡(jiǎn)單,僅僅要推斷map結(jié)果的value的長(zhǎng)度是不是1就決定是新的鍵還是值。

package cn.zhf.hadoop;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;public class SingleJoin extends Configured implements Tool{public static void main(String[] args) throws Exception {Tool tool = new SingleJoin();ToolRunner.run(tool, args);print(tool);}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = getConf();Job job = new Job();job.setJarByClass(getClass());FileSystem fs = FileSystem.get(conf);fs.delete(new Path("out"),true);FileInputFormat.addInputPath(job, new Path("a.txt"));FileInputFormat.addInputPath(job, new Path("b.txt"));FileOutputFormat.setOutputPath(job,new Path("out"));job.setMapperClass(JoinMapper.class);job.setReducerClass(JoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);return 0;}public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] str = value.toString().split(" ");context.write(new Text(str[0]), new Text(str[1]));}}public static class JoinReducer extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{Iterator<Text> iterator = values.iterator();Text keyy = new Text();Text valuee = new Text();while(iterator.hasNext()){Text temp = iterator.next();if(temp.toString().length() == 1){keyy.set(temp);valuee.set(iterator.next());}else{valuee.set(temp);keyy.set(iterator.next());}}context.write(keyy, valuee);}}public static void print(Tool tool) throws IOException{FileSystem fs = FileSystem.get(tool.getConf());Path path = new Path("out/part-r-00000");FSDataInputStream fsin = fs.open(path);int length = 0;byte[] buff = new byte[128];while((length = fsin.read(buff,0,128)) != -1)System.out.println(new String(buff,0,length));} }
reference:《MapReduce2.0源代碼分析及編程實(shí)踐》

總結(jié)

以上是生活随笔為你收集整理的MapReduce数据连接的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。