mapreduce的shuffle机制(来自学习笔记)
3. MAPREDUCE原理篇(2)
3.1 mapreduce的shuffle機制
3.1.1 概述:
MapReduce中,mapper階段處理的數據如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle;
Shuffle:數據混洗---------(核心機制:數據分區,排序,局部聚合,緩存,拉取,再合并排序)
具體來說,就是將MapTask輸出的處理數據結果,按照Partitioner組件制定的規則分發ReduceTask,并在分發的過程中,對數據按key進行分區和排序.
?
3.1.2 主要流程:
Shuffle緩存流程:
Shuffle是MapReduce處理流程中的一個核心,它的每一個處理步驟是分散在各個Maptask和reducetask節點上完成的,整體來看,分為3個操作:
1、分區partition(如果reduceTask只有一個或者沒有,那么partition將不起作用。設置沒設置相當于沒有)
2、Sort根據key排序(MapReduce編程中sort是一定會做的,并且只能按照key排序,當然如果沒有reduce階段,那么就不會對key排序)
3、Combiner進行局部value的合并(Combiner是可選的組件,作用是為了提高任務的執行效率)
?
3.1.3 詳細流程
1、mapTask收集我們map()方法輸出的kv對,放在內存緩沖區kvbuffer(環形緩沖區:內存中的一種首尾相連的數據結構,kvbuffer包含數據區和索引區)中,在存數據的時候,會調用partitioner進行分區編號的計算,并存入元數據中
2、當內存緩沖區的數據達到100*0.8時,就會開始溢寫到本地磁盤文件file.out,可能會溢出多次,則會有多個文件,相應的緩沖區中的索引區數據溢出為磁盤索引文件file.out.index
3、在溢寫前,會先根據分區編號排序,相同的分區的數據,排在一起,再根據map的key排序(快排)
4、多個溢寫文件會被合并成大的溢出文件(歸并排序)
5、在數據量大的時.候,可以對maptask結果啟用壓縮,將mapreduce.map.ouput.compress設為true,并使用
mapreduce.map.output.compress.codec設置使用的壓縮算法,可以提高數據傳輸到reduce端的效率
6、reduceTask根據自己的分區號,去各個mapTask機器上取相應的結果分區數據
7、reduceTask會取到同一個分區的來自不同mapTask的結果文件,reduceTask會將這些文件再進行合并(歸并排序)
8、合并成r大文件后,shuffle的過程也就結束了,后面進入reduceTask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
?
Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快
緩沖區的大小可以通過參數調整,? 參數:io.sort.mb? 默認100M
?
3.1.4 詳細流程示意圖
4、自定義Shuffle過程中的組件
1、自定義輸入
? ? 默認輸入類:TextInputFormat
自定義:
模仿? ?org.apache.hadoop.mapreduce.lib.input.LineRecordReader? 和org.apache.hadoop.mapreduce.lib.input.TextInputFormat
1、自定義類繼承FileInputFormat public class MyFileInputFormat extends FileInputFormat<Text, LongWritable>{@Override public RecordReader<Text, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {//實例化一個MyAllFileRecodReader reader = new MyAllFileRecodReader();//split參數和context都是框架自動傳入的,把這兩個參數傳給reader進行處理,以便獲取相關信息reader.initialize(split, context);return reader;}/*** 給定的文件名可拆分嗎?返回false確保單個輸入文件不會被分割。以便Mapper處理整個文件。*/@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;} }2、自定義類實現RecordReader public class MyFileRecodReader extends RecordReader<Text, LongWritable>{//用于存儲文件系統輸入流private FSDataInputStream open = null;//保存文件長度private int fileSplitLength = 0;/*** 當前的MyAllFileRecodReader讀取到的一個key-value*/private Text key = new Text();private LongWritable value = new LongWritable();@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {//通過InputSplit對象獲取文件路徑FileSplit fileSplit = (FileSplit)split;Path path = fileSplit.getPath();//獲取文件長度fileSplitLength = (int)fileSplit.getLength();//通過context對象獲取到配置文件信息,通過配置文件獲取到一個當前文件系統Configuration configuration = context.getConfiguration();FileSystem fs = FileSystem.get(configuration);//獲取文件系統的一個輸入流open = fs.open(path);}/*** 已讀標記* 如果為false,表示還沒有進行讀取* 在需求中一個mapTask只處理一個小文件,一個mapTask最終只需要讀取一次就完畢* 如果一個文件讀取完畢了,那么就把isRead這個變量標記為true*/private boolean isRead = false;/*** 實現讀取規則:逐文件讀取*/@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {//如果沒有讀取過文件就進入if(!isRead){//準備一個字節數組長度為文件的長度byte[] buffer = new byte[fileSplitLength];//一次性把真個文件讀入字節數組中IOUtils.readFully(open, buffer);//把讀取到的文件傳給keykey.set(buffer, 0, fileSplitLength);//設置已讀標記為trueisRead = true;//返回讀取一個文件成功標記return true;}else{return false;}}//獲取key的方法@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}//獲取當前value值@Overridepublic LongWritable getCurrentValue() throws IOException, InterruptedException {return value;}/*** 獲取數據的處理進度的*/@Overridepublic float getProgress() throws IOException, InterruptedException {//已讀為真返回1.0,沒有讀返回0return isRead ? 1.0F : 0F;}@Overridepublic void close() throws IOException {//關閉輸入流IOUtils.closeQuietly(open);}2、自定義分區
需要:? 1、繼承 partitioner
? ? ? ? ? ? ?2、重寫getpartition()方法
? ? ? ? ? ? ?3、在main方法中指定分區類? job.setPartitionclass()
package homework; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class Mypartition extends Partitioner<Student, Text> {@Overridepublic int getPartition(Student key, Text arg1, int arg2) {if(key.getType().equals("math")){return 0;}if(key.getType().equals("english")){return 1;}if(key.getType().equals("computer")){return 2;}else{return 3;}} }3、自定義排序
? 需要? :? ? 1、實現writableComparable
? ? ? ? ? ? ? ? ? ?2、重新write()、readFields()、compareTo()方法
package homework; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Student implements WritableComparable<Student> {private String type;private String name;private Double avg;public Student() {super();}public Student(String type, String name, Double avg) {super();this.type = type;this.name = name;this.avg = avg;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Double getAvg() {return avg;}public void setAvg(Double avg) {this.avg = avg;}@Overridepublic String toString() {return type + "\t" + name + "\t" + avg ;}@Overridepublic void readFields(DataInput in) throws IOException {this.type=in.readUTF();this.name=in.readUTF();this.avg=in.readDouble(); }@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(type);out.writeUTF(name);out.writeDouble(avg); }@Overridepublic int compareTo(Student o) {int temp=o.getType().compareTo(this.getType());if(temp==0){if(o.getAvg()>this.getAvg()){return 1;}else if(o.getAvg()<this.getAvg()){return -1;}else{return 0;}}return temp;} }4、自定義分組
需要??:?????1、繼承writableComparable
??????????????????2、重寫compare()方法
??????????????????3、指定分組類??job.setGroupingComparatorClass(MyGroup.class);
??????????????????4、既有分區又有排序的時候,分組字段一定在排序字段中
package homework;import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;public class MyGroup extends WritableComparator {public MyGroup() {super(Student.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {Student aa=(Student)a;Student bb=(Student)b;return aa.getType().compareTo(bb.getType());} }5、自定義輸出
1)模仿 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
public class MyMultipePathOutputFormat extends FileOutputFormat<Text, NullWritable>{@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {//獲得當前的文件系統傳給自定義的RecordWriter組件Configuration configuration = job.getConfiguration();FileSystem fs = FileSystem.get(configuration);try {//返回一個RecordWriter正在處理輸出數據的組件return new MyMutiplePathRecordWriter(fs);} catch (Exception e) {e.printStackTrace();}return null;} }2)繼承RecordWriter 并實現write()方法
public class MyMutiplePathRecordWriter extends RecordWriter<Text, NullWritable>{//聲明要輸出的兩個路徑private DataOutputStream out_jige;private DataOutputStream out_bujige;public MyMutiplePathRecordWriter(FileSystem fs) throws Exception {//創建系統輸出流out_jige = fs.create(new Path("E:\\bigdata\\cs\\jige\\my_output_jige.txt"));out_bujige = fs.create(new Path("E:\\bigdata\\cs\\bujige\\my_output_bujige.txt"));}/*** 實現寫出方法,根據需要寫出的格式自定義*/@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {//接受到的key格式為:course + "\t" + name + "\t" + avgScoreString keyStr = key.toString();String[] split = keyStr.split("\t");//獲取到平均分字段double score = Double.parseDouble(split[2]);//沒一行數據加入個換行符byte[] bytes = (keyStr + "\n").getBytes();//如果平均分大于60就用DataOutputStream寫出到jige目錄if(score >= 60){out_jige.write(bytes, 0, bytes.length);}else{//小于60分的寫道bujige目錄out_bujige.write(bytes, 0, bytes.length);}}/*** 在close方法中關閉輸出流。*/@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.closeQuietly(out_jige);IOUtils.closeQuietly(out_bujige);} }?
總結
以上是生活随笔為你收集整理的mapreduce的shuffle机制(来自学习笔记)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 系统怎么看不到u盘启动项 U盘启动项未识
- 下一篇: MapTask并行度决定机制、FileI