日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

[转]自定义hadoop map/reduce输入文件切割InputFormat

發布時間:2023/11/27 生活经验 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 [转]自定义hadoop map/reduce输入文件切割InputFormat 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文轉載自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html  

  

  hadoop會對原始輸入文件進行文件切割,然后把每個split傳入mapper程序中進行處理,FileInputFormat是所有以文件作 為數據源的InputFormat實現的基類,FileInputFormat保存作為job輸入的所有文件,并實現了對輸入文件計算splits的方 法。至于獲得記錄的方法是有不同的子類進行實現的。

??????? 那么,FileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的文件,所以如果一個文件的大小比block小,將不會被劃分,這也是Hadoop處理大文件的效率要比處理很多小文件的效率高的原因。?

???????hadoop默認的InputFormat是TextInputFormat,重寫了FileInputFormat中的createRecordReader和isSplitable方法。該類使用的reader是LineRecordReader,即以回車鍵(CR = 13)或換行符(LF = 10)為行分隔符。

????? 但大多數情況下,回車鍵或換行符作為輸入文件的行分隔符并不能滿足我們的需求,通常用戶很有可能會輸入回車鍵、換行符,所以通常我們會定義不可見字符(即用戶無法輸入的字符)為行分隔符,這種情況下,就需要新寫一個InputFormat。

????? 又或者,一條記錄的分隔符不是字符,而是字符串,這種情況相對麻煩;還有一種情況,輸入文件的主鍵key已經是排好序的了,需要hadoop做的只是把相 同的key作為一個數據塊進行邏輯處理,這種情況更麻煩,相當于免去了mapper的過程,直接進去reduce,那么InputFormat的邏輯就相 對較為復雜了,但并不是不能實現。

1、改變一條記錄的分隔符,不用默認的回車或換行符作為記錄分隔符,甚至可以采用字符串作為記錄分隔符。
???? 1)自定義一個InputFormat,繼承FileInputFormat,重寫createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫isSplitable方法,具體代碼如下:

public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {

?? @Override

?? public RecordReader<LongWritable, Text> createRecordReader(?InputSplit split, TaskAttemptContext context) {
??????? return?new SearchRecordReader("\b");

??? }

??? @Override
??? protected boolean isSplitable(FileSystem fs, Path filename) {
???????? // 輸入文件不分片
??????? return false;
???? }
}

?? 2)關鍵在于定義一個新的SearchRecordReader繼承RecordReader,支持自定義的行分隔符,即一條記錄的分隔符。標紅的地方為與hadoop默認的LineRecordReader不同的地方。

public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
?private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
?
?private CompressionCodecFactory compressionCodecs = null;
?private long start;
?private long pos;
?private long end;
?private LineReader in;
?private int maxLineLength;
?private LongWritable key = null;
?private Text value = null;
?//行分隔符,即一條記錄的分隔符
?private byte[] separator = {'\b'};
?private int sepLength = 1;

??public IsearchRecordReader(){
?}
?public IsearchRecordReader(String seps){
??this.separator = seps.getBytes();?
??sepLength = separator.length;
?}

?public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
??FileSplit split = (FileSplit) genericSplit;
??Configuration job = context.getConfiguration();
??this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

??this.start = split.getStart();
??this.end = (this.start + split.getLength());
??Path file = split.getPath();
??this.compressionCodecs = new CompressionCodecFactory(job);
??CompressionCodec codec = this.compressionCodecs.getCodec(file);

??// open the file and seek to the start of the split
??FileSystem fs = file.getFileSystem(job);
??FSDataInputStream fileIn = fs.open(split.getPath());
??boolean skipFirstLine = false;
??if (codec != null) {
???this.in = new LineReader(codec.createInputStream(fileIn), job);
???this.end = Long.MAX_VALUE;
??} else {
???if (this.start != 0L) {
????skipFirstLine = true;
????this.start -= sepLength;
????fileIn.seek(this.start);
???}
???this.in = new LineReader(fileIn, job);
??}
??if (skipFirstLine) { // skip first line and re-establish "start".
???int newSize = in.readLine(new Text(), 0, (int) Math.min(?(long) Integer.MAX_VALUE, end - start));
???
???if(newSize > 0){
????start += newSize;
???}
??}

??this.pos = this.start;
?}

?public boolean nextKeyValue() throws IOException {
??if (this.key == null) {
???this.key = new LongWritable();
??}
??this.key.set(this.pos);
??if (this.value == null) {
???this.value = new Text();
??}
??int newSize = 0;
??while (this.pos < this.end) {
???newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
?(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

???if (newSize == 0) {
????break;
???}
???this.pos += newSize;
???if (newSize < this.maxLineLength) {
????break;
???}

???LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
??}

??if (newSize == 0) {
???//讀下一個buffer
???this.key = null;
???this.value = null;
???return false;
??}
??//讀同一個buffer的下一個記錄
??return true;
?}

?public LongWritable getCurrentKey() {
??return this.key;
?}

?public Text getCurrentValue() {
??return this.value;
?}

?public float getProgress() {
??if (this.start == this.end) {
???return 0.0F;
??}
??return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
?}

?public synchronized void close() throws IOException {
??if (this.in != null)
???this.in.close();
?}

}

?? 3)重寫SearchRecordReader需要的LineReader,可作為SearchRecordReader內部類。特別需要注意的地方就 是,讀取文件的方式是按指定大小的buffer來讀,必定就會遇到一條完整的記錄被切成兩半,甚至如果分隔符大于1個字符時分隔符也會被切成兩半的情況, 這種情況一定要加以拼接處理。

public class LineReader {
??//回車鍵(hadoop默認)
??//private static final byte CR = 13;
??//換行符(hadoop默認)
??//private static final byte LF = 10;
????
??//按buffer進行文件讀取
??private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
??private int bufferSize = DEFAULT_BUFFER_SIZE;
??private InputStream in;
??private byte[] buffer;
??private int bufferLength = 0;
??private int bufferPosn = 0;
??
??LineReader(InputStream in, int bufferSize) {
???this.bufferLength = 0;
??? this.bufferPosn = 0;
?????
???this.in = in;
???this.bufferSize = bufferSize;
???this.buffer = new byte[this.bufferSize];
??}

??public LineReader(InputStream in, Configuration conf) throws IOException {
???this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
??}

??public void close() throws IOException {
???in.close();
??}

?public int readLine(Text str, int maxLineLength) throws IOException {
???return readLine(str, maxLineLength, Integer.MAX_VALUE);
??}

??public int readLine(Text str) throws IOException {
???return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
??}

? //以下是需要改寫的部分_start,核心代碼

??public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
???str.clear();
???Text record = new Text();
???int txtLength = 0;
???long bytesConsumed = 0L;
???boolean newline = false;
???int sepPosn = 0;
???
???do {
????//已經讀到buffer的末尾了,讀下一個buffer
????if (this.bufferPosn >= this.bufferLength) {
?????bufferPosn = 0;
?????bufferLength = in.read(buffer);
?????
?????//讀到文件末尾了,則跳出,進行下一個文件的讀取
?????if (bufferLength <= 0) {
??????break;
?????}
????}
????
????int startPosn = this.bufferPosn;
????for (; bufferPosn < bufferLength; bufferPosn ++) {
?????//處理上一個buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復字符過多在這里會有問題)
?????if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
??????sepPosn = 0;
?????}
?????
?????//遇到行分隔符的第一個字符
?????if (buffer[bufferPosn] == separator[sepPosn]) {
??????bufferPosn ++;
??????int i = 0;
??????
??????//判斷接下來的字符是否也是行分隔符中的字符
??????for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
???????
???????//buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半
???????if(bufferPosn + i >= bufferLength){
????????bufferPosn += i - 1;
????????break;
???????}
???????
???????//一旦其中有一個字符不相同,就判定為不是分隔符
???????if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
????????sepPosn = 0;
????????break;
???????}
??????}
??????
??????//的確遇到了行分隔符
??????if(sepPosn == sepLength){
???????bufferPosn += i;
???????newline = true;
???????sepPosn = 0;
???????break;
??????}
?????}
????}

????
????int readLength = this.bufferPosn - startPosn;

????bytesConsumed += readLength;
????//行分隔符不放入塊中
????//int appendLength = readLength - newlineLength;
????if (readLength > maxLineLength - txtLength) {
?????readLength = maxLineLength - txtLength;
????}
????if (readLength > 0) {
?????record.append(this.buffer, startPosn, readLength);
?????txtLength += readLength;
?????
?????//去掉記錄的分隔符
?????if(newline){
??????str.set(record.getBytes(), 0, record.getLength() - sepLength);
?????}
????}

???} while (!newline && (bytesConsumed < maxBytesToConsume));

???if (bytesConsumed > (long)Integer.MAX_VALUE) {
????throw new IOException("Too many bytes before newline: " + bytesConsumed);
???}
???
???return (int) bytesConsumed;
??}

? //以下是需要改寫的部分_end

//以下是hadoop-core中LineReader的源碼_start

public int readLine(Text str, int maxLineLength, int maxBytesToConsume)?throws IOException{
??? str.clear();
??? int txtLength = 0;
??? int newlineLength = 0;
??? boolean prevCharCR = false;
??? long bytesConsumed = 0L;
????do {
????? int startPosn = this.bufferPosn;
????? if (this.bufferPosn >= this.bufferLength) {
??????? startPosn = this.bufferPosn = 0;
??????? if (prevCharCR)? bytesConsumed ++;
??????? this.bufferLength = this.in.read(this.buffer);
??????? if (this.bufferLength <= 0)? break;
????? }
????? for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
??????? if (this.buffer[this.bufferPosn] == LF) {
????????? newlineLength = (prevCharCR) ? 2 : 1;
????????? this.bufferPosn ++;
????????? break;
??????? }
??????? if (prevCharCR) {
????????? newlineLength = 1;
????????? break;
??????? }
??????? prevCharCR = this.buffer[this.bufferPosn] == CR;
????? }
????? int readLength = this.bufferPosn - startPosn;
????? if ((prevCharCR) && (newlineLength == 0))
??????? --readLength;
????? bytesConsumed += readLength;
????? int appendLength = readLength - newlineLength;
????? if (appendLength > maxLineLength - txtLength) {
??????? appendLength = maxLineLength - txtLength;
????? }
????? if (appendLength > 0) {
??????? str.append(this.buffer, startPosn, appendLength);
??????? txtLength += appendLength; }
??? }
??? while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));

??? if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);
??? return (int)bytesConsumed;
? }

//以下是hadoop-core中LineReader的源碼_end

}

2、已經按主鍵key排好序了,并保證相同主鍵key一定是在一起的,假設每條記錄的第一個字段為主鍵,那么如 果沿用上面的LineReader,需要在核心方法readLine中對前后兩條記錄的id進行equals判斷,如果不同才進行split,如果相同繼 續下一條記錄的判斷。代碼就不再貼了,但需要注意的地方,依舊是前后兩個buffer進行交接的時候,非常有可能一條記錄被切成了兩半,一半在前一個buffer中,一半在后一個buffer中。

???? 這種方式的好處在于少去了reduce操作,會大大地提高效率,其實mapper的過程相當的快,費時的通常是reduce。

轉載于:https://www.cnblogs.com/Dreama/archive/2011/09/19/2181523.html

總結

以上是生活随笔為你收集整理的[转]自定义hadoop map/reduce输入文件切割InputFormat的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。