Hadoop中的压缩Codec
生活随笔
收集整理的這篇文章主要介紹了
Hadoop中的压缩Codec
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
作為輸入
????????????當壓縮文件作為MapReduce的輸入時,MapReduce將自動通過擴展名找到相應的Codec對其解壓。
作為輸出
????????????當MapReduce的輸出文件需要壓縮時,可以更改mapred.output.compress為true,mapred.output.compression.codec為想要使用的codec的類名稱,當然你可以可以在代碼中指定,通過調用FileOutputFormt的靜態方法去設置這兩個屬性:
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | package?com.hadoop.codecs; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.io.compress.GzipCodec; import?org.apache.hadoop.mapreduce.Job; import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import?java.io.IOException; public?class?CodecDemo?{ ????public?static?void?main(String[]?args)?throws?Exception?{ ????????if?(args.length!=2){ ????????????System.exit(-1); ????????} ????????Job?job=new?Job(); ????????job.setJarByClass(CodecDemo.class); ????????job.setJobName("CodecDemo"); ????????FileInputFormat.addInputPath(job,?new?Path(args[0])); ????????FileOutputFormat.setOutputPath(job,?new?Path(args[1])); ????????job.setMapperClass(MyMapper.class); ????????job.setCombinerClass(MyReducer.class); ????????job.setReducerClass(MyReducer.class); ????????job.setOutputKeyClass(Text.class); ????????job.setOutputValueClass(LongWritable.class); ????//設置輸出壓縮開啟 ????????FileOutputFormat.setCompressOutput(job,?true); ????//設置壓縮類:GzipCodec ????????FileOutputFormat.setOutputCompressorClass(job,?GzipCodec.class); ????????System.exit(job.waitForCompletion(true)?0:1); ????} } |
使用CompressionCodes解壓縮
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /* 使用CompressionCodes解壓縮CompressionCodec有兩個方法可以方便的壓縮和解壓。 壓縮:通過createOutputStream(OutputStream?out)方法獲得CompressionOutputStream對象 解壓:通過createInputStream(InputStream?in)方法獲得CompressionInputStream對象 從命令行接受一個CompressionCodec實現類的參數,然后通過ReflectionUtils把實例化這個類,調用CompressionCodec的接口方法對標準輸出流進行封裝,封裝成一個壓縮流,通過IOUtils類的copyBytes方法把標準輸入流拷貝到壓縮流中,最后調用CompressionCodec的finish方法,完成壓縮。 */ package?com.hadoop.codecs; import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.io.IOUtils; import?org.apache.hadoop.io.compress.CompressionCodec; import?org.apache.hadoop.io.compress.CompressionOutputStream; import?org.apache.hadoop.util.ReflectionUtils; public?class?Compressors?{ ????public?static?void?main(String[]?args)?throws?Exception?{ ????????String?codecClassName?=?args[0]; ????????Class<?>?codecClass?=?Class.forName(codecClassName); ????????Configuration?conf?=?new?Configuration(); ????????CompressionCodec?codec?=?(CompressionCodec)?ReflectionUtils.newInstance(codecClass,?conf); ????????CompressionOutputStream?out?=?codec.createOutputStream(System.out); ????????IOUtils.copyBytes(System.in,?out,?4096,?false); ????????out.finish(); ????} } |
使用CompressionCodecFactory解壓縮
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | /* ????如果你想讀取一個被壓縮的文件的話,首先你得先通過擴展名判斷該用哪種codec,當然有更簡便得辦法,CompressionCodecFactory已經幫你把這件事做了,通過傳入一個Path調用它得getCodec方法,即可獲得相應得codec。 ????注意看下removeSuffix方法,這是一個靜態方法,它可以將文件的后綴去掉,然后我們將這個路徑做為解壓的輸出路徑。CompressionCodecFactory能找到的codec也是有限的,默認只有三種org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,如果想添加其他的codec你需要更改io.compression.codecs屬性,并注冊codec。 */ package?com.hadoop.codecs;?? import?org.apache.hadoop.conf.Configuration;?? import?org.apache.hadoop.fs.FileSystem;?? import?org.apache.hadoop.fs.Path;?? import?org.apache.hadoop.io.IOUtils;?? import?org.apache.hadoop.io.compress.CompressionCodec;?? import?org.apache.hadoop.io.compress.CompressionCodecFactory;?? ??? import?java.io.IOException;?? import?java.io.InputStream;?? import?java.io.OutputStream;?? import?java.net.URI;?? ??? public?class?FileDecompressor?{?? ????public?static?void?main(String[]?args)?throws?Exception?{?? ????????String?uri?=?args[0];?? ????????Configuration?conf?=?new?Configuration();?? ????????FileSystem?fs?=?FileSystem.get(URI.create(uri),?conf);?? ??? ????????Path?inputPath?=?new?Path(uri);?? ????????CompressionCodecFactory?factory?=?new?CompressionCodecFactory(conf);?? ????????CompressionCodec?codec?=?factory.getCodec(inputPath);?? ????????if?(codec?==?null)?{?? ????????????System.out.println("No?codec?found:"?+?uri);?? ????????????System.exit(1);?? ????????}?? ????????String?outputUri?=?CompressionCodecFactory.removeSuffix(uri,?codec.getDefaultExtension());?? ??? ????????InputStream?in?=?null;?? ????????OutputStream?out?=?null;?? ??? ????????try?{?? ????????????in?=?codec.createInputStream(fs.open(inputPath));?? ????????????out?=?fs.create(new?Path(outputUri));?? ????????????IOUtils.copyBytes(in,out,conf);?? ????????}?finally?{?? ????????????IOUtils.closeStream(in);?? ????????????IOUtils.closeStream(out);?? ????????}?? ????}?? } |
總結
以上是生活随笔為你收集整理的Hadoop中的压缩Codec的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop 2.x的Distribut
- 下一篇: hadoop 之DefaultStrin