日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop IO

發布時間:2025/6/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop IO 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

總結:

?????本章包含了以下內容

???? 第一,數據完整性,hadoop采用CRC來檢測數據是否是完整的,在寫入文件時,hdfs為每個數據塊都生成一個crc文件。客戶端讀取數據時生成一個crc與數據節點存儲的crc做比對,如果不匹配則說明數據已經損壞了。數據節點在后臺運行一個程序定期檢測數據,防止物理存儲介質中位衰減而造成的數據損壞。

???? 第二,壓縮和解壓,在mapreduce中使用壓縮可以減少存儲空間和加速在數據在網絡上的傳輸。

???? 第三,序列化,hadoop使用自己的序列化機制,實現Writable接口

???? 第四,基本文件類型,SequenceFile和MapFile的讀寫。

?

?

1 數據完整性

??? 由于每個磁盤或者網絡上的I/O操作可能會對正在讀寫的數據不慎引入錯誤,如果通過的數據流量非常大,數據發生損壞的幾率很高。

??? 檢查損壞數據的常用方法是在第一次進入系統時計算數據的校驗和,然后只要數據不是在一個可靠的通道上傳輸,就可能會發生損壞。如果新生成的校驗和不完全匹配原始的校驗和,那么數據就會被認為是損壞的。

??? 一個常用的錯誤檢測代碼是CRC-32(cyclic redundancy check,循環冗余檢查),計算一個32位的任何大小輸入的整數校驗和。

1.1 HDFS的數據完整性

????HDFS以透明方式校驗所有寫入它的數據,并在默認設置下,會在讀取數據時驗證校驗和。針對數據的每個io.bytes.per.checksum(默認512字節)字節,都會創建一個單獨的校驗和。

?

???數據節點負責在存儲數據及其校驗和之前驗證它們收到的數據。 從客戶端和其它數據節點復制過來的數據。客戶端寫入數據并且將它發送到一個數據節點管線中,在管線的最后一個數據節點驗證校驗和。

?? 客戶端讀取數據節點上的數據時,會驗證校驗和,將其與數據節點上存儲的校驗和進行對比。每個數據節點維護一個連續的校驗和驗證日志,因此它知道每個數據塊最后驗證的時間。

?

??? 每個數據節點還會在后臺線程運行一個DataBlockScanner(數據塊檢測程序),定期驗證存儲在數據節點上的所有塊,為了防止物理存儲介質中位衰減鎖造成的數據損壞。

??

??? HDFS通過復制完整的副本來產生一個新的,無錯的副本來“治愈”哪些出錯的數據塊。工作方式:如果客戶端讀取數據塊時檢測到錯誤,拋出Checksum Exception前報告該壞塊以及它試圖從名稱節點中藥讀取的數據節點。名稱節點將這個塊標記為損壞的,不會直接復制給客戶端或復制該副本到另一個數據節點。它會從其他副本復制一個新的副本。

?

??? 使用Open方法來讀取文件前,通過setVerifyChecksum()方法來禁用校驗和驗證。

1.2 本地文件系統

??? Hadoop的本地文件系統執行客戶端校驗。意味著,在寫一個名為filename的文件時,文件系統的客戶端以透明的方式創建一個隱藏的文件.filename.crc。在同一個文件夾下,包含每個文件塊的校驗和。

??? 數據塊大小由io.bytes.per.checksum屬性控制,塊的大小作為元數據存儲在.crc文件中。

???

??? 也可能禁用校驗和:底層文件系統原生支持校驗和。這里通過RawLocalFileSystem來替代LocalFileSystem完成。要在一個應用中全局使用,只需要設置fs.file.impl值為org.apache.hadoop.fs.RawLocalFileSystem來重新map執行文件的URL。或者只想對某些讀取禁用校驗和校驗。

??? Configuration conf = ...?
??? FileSystem fs = new RawLocalFileSystem();
??? fs.initialize(null, conf);

???

1.2.3 ChecksumFileSystem

????LocalFileSystem使用ChecksumFileSystem(校驗和文件系統)為自己工作,這個類可以很容易添加校驗和功能到其他文件系統中。因為ChecksumFileSystem也包含于文件系統中。

?? FileSystem rawFs = ...?
???FileSystem checksummedFs = new ChecksumFileSystem(rawFs);

?

?

?

2 壓縮

??? 文件壓縮兩大好處:減少存儲文件所需要的空間且加快了數據在網絡上或從磁盤上或到磁盤上的傳輸速度。???

??????????????????????????????????????????????????????????????????????????????????? 壓縮格式

2.1 編碼和解碼

??? 編碼和解碼器用以執行壓縮解壓算法。在Hadoop中,編碼和解碼是通過一個壓縮解碼器接口實現的。

?

???

?

????CompressionCodec對流進行壓縮和解壓縮

?

??? CompressionCodec有兩個方法輕松地壓縮和解壓數據。使用use the? createOutputStream(OutputStream out)創建一個CompressionOutputStream,將其以壓縮格式寫入底層的流。使用createInputStream(InputStream in) 獲取一個CompressionInputStream,從底層的流讀取未壓縮的數據。

?

???

01?package com.laos.hadoop;?
02?
03?import org.apache.hadoop.conf.Configuration;?
04?import org.apache.hadoop.io.IOUtils;?
05?import org.apache.hadoop.io.compress.CompressionCodec;?
06?import org.apache.hadoop.io.compress.CompressionOutputStream;?
07?import org.apache.hadoop.util.ReflectionUtils;?
08?
09?public class StreamCompressor {?
10?????public static void main(String[] args) throws Exception {?
11?????????String codecClassname = "org.apache.hadoop.io.compress.GzipCodec";?
12?????????Class<?>?codecClass = Class.forName(codecClassname);?
13?????????Configuration conf = new Configuration();?
14?????????CompressionCodec codec = (CompressionCodec) ReflectionUtils?
15?????????????????.newInstance(codecClass, conf);?
16????????? //將讀入數據壓縮至System.out
17?????????CompressionOutputStream out = codec.createOutputStream(System.out);?
18?????????IOUtils.copyBytes(System.in, out, 4096, false);?
19?????????out.finish();?
20?????}?
21?
22?}

?

$ echo "Test"|hadoop jar hadoop-itest.jar com.laos.hadoop.StreamCompressor|gunzip

?

????使用CompressionCodecFactory方法來推斷CompressionCodec

?

????在閱讀一個壓縮文件時,我們可以從擴展名來推斷出它的編碼/解碼器。以.gz結尾的文件可以用GzipCodec來閱讀。CompressionCodecFactory提供了getCodec()方法,從而將文件擴展名映射到相應的CompressionCodec。

?

?????

01?package com.laos.hadoop;?
02?
03?import java.io.InputStream;?
04?import java.io.OutputStream;?
05?import java.net.URI;?
06?
07?import org.apache.hadoop.conf.Configuration;?
08?import org.apache.hadoop.fs.FileSystem;?
09?import org.apache.hadoop.fs.Path;?
10?import org.apache.hadoop.io.IOUtils;?
11?import org.apache.hadoop.io.compress.CompressionCodec;?
12?import org.apache.hadoop.io.compress.CompressionCodecFactory;?
13?
14?public class FileDecompressor {?
15?????public static void main(String[] args) throws Exception {?
16?????????String uri = args[0];?
17?????????Configuration conf = new Configuration();?
18?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
19?
20?????????Path inputPath = new Path(uri);?
21?????????CompressionCodecFactory factory = new CompressionCodecFactory(conf);?
22?????????CompressionCodec codec = factory.getCodec(inputPath);?
23?????????if (codec == null) {?
24?????????????System.err.println("No codec found for " + uri);?
25?????????????System.exit(1);?
26?????????}?
27?????????String outputUri = CompressionCodecFactory.removeSuffix(uri, codec?
28?????????????????.getDefaultExtension());?
29?????????InputStream in = null;?
30?????????OutputStream out = null;?
31?????????try {?
32?????????????in = codec.createInputStream(fs.open(inputPath));?
33?????????????out = fs.create(new Path(outputUri));?
34?????????????IOUtils.copyBytes(in, out, conf);?
35?????????} finally {?
36?????????????IOUtils.closeStream(in);?
37?????????????IOUtils.closeStream(out);?
38?????????}?
39?????}?
40?}

??? CompressionCodecFactory從io.compression.codecs配置屬性定義的列表中找到編碼和解碼器,默認情況下,Hadoop給出所有的編碼和解碼器。每個編碼/加碼器都知道默認文件擴展名。

?

???

?????????????????

2.2 壓縮和輸入分隔

??? 考慮如何壓縮哪些將由MapReduce處理的數據時,考慮壓縮格式是否支持分隔很重要。

??? 例如,gzip格式使用default來存儲壓縮過的數據,default將數據作為一系列壓縮過的塊存儲,但是每塊的開始沒有指定用戶在數據流中的任意點定位到下一個塊的起始位置,而是自身與數據同步,所以gzip不支持分隔機制。

?

2.3 在MapReduce中使用壓縮

????如果要壓縮MapReduce作業的輸出,設置mapred.output.compress為true,mapred.output.compression.codec屬性指定編碼解碼器。

???如果輸入的文件時壓縮過的,MapReduce讀取時,它們會自動解壓,根據文件擴展名來決定使用那一個壓縮解碼器。

?

??

01?package com.laos.hadoop;?
02?
03?import java.io.IOException;?
04?
05?import org.apache.hadoop.fs.Path;?
06?import org.apache.hadoop.io.IntWritable;?
07?import org.apache.hadoop.io.Text;?
08?import org.apache.hadoop.io.compress.CompressionCodec;?
09?import org.apache.hadoop.io.compress.GzipCodec;?
10?import org.apache.hadoop.mapred.FileInputFormat;?
11?import org.apache.hadoop.mapred.FileOutputFormat;?
12?import org.apache.hadoop.mapred.JobClient;?
13?import org.apache.hadoop.mapred.JobConf;?
14?
15?
16?public class MaxTemperatureWithCompression {?
17????? public static void main(String[] args) throws IOException {?
18????????? if (args.length != 2) {?
19????????? System.err.println("Usage: MaxTemperatureWithCompression?<input?path>?" +?
20????????? "<output?path>");?
21????????? System.exit(-1);?
22????????? }?
23??????????
24????????? JobConf conf = new JobConf(MaxTemperatureWithCompression.class); conf.setJobName("Max temperature with output compression");?
25????????? FileInputFormat.addInputPath(conf, new Path(args[0]));?
26????????? FileOutputFormat.setOutputPath(conf, new Path(args[1]));?
27??????????
28????????? conf.setOutputKeyClass(Text.class);?
29????????? conf.setOutputValueClass(IntWritable.class);?
30??????????
31????????? conf.setBoolean("mapred.output.compress", true);?
32????????? conf.setClass("mapred.output.compression.codec", GzipCodec.class,?
33????????? CompressionCodec.class);?????????
34????????? JobClient.runJob(conf);?
35????????? }?
36?}

?

?

?

?

3 序列化

??? 序列化:將結構化對象轉換為字節流以便于通過網絡進行傳輸或寫入存儲的過程。

??? 反序列化:將字節流轉為一系列結構化對象的過程。

?

??? 序列化用在兩個地方:進程間通信和持久存儲。

?

??? 在Hadoop中,節點之間的進程間通信是用遠程過程調用(RPC)。RPC協議將使用序列化將消息編碼為二進制流(發送到遠程節點),此后在接收端二進制流被反序列化為消息。

?

??? Hadoop使用自己的序列化格式Writables。

?

3.1 Writable接口

???

package org.apache.hadoop.io;?

import java.io.DataOutput;?
import java.io.DataInput;?
import java.io.IOException;?
public interface Writable {?
????void write(DataOutput out) throws IOException;?//將狀態寫入二進制格式的流
????void readFields(DataInput in) throws IOException; //從二進制格式的流讀出其狀態
}

?

WritableComparable和Comparator

??? IntWritable實現了WritableComparable接口。而WritableComparable繼承了Writable和Comparable。

???

??? 類型的比較對MapReduce而言至關重要,鍵和鍵之間的比較是在排序階段完成的。Hadoop提供餓了優化方法:

???

package org.apache.hadoop.io;?

import java.util.Comparator;?
public interface?RawComparator<T>?extends Comparator<T>?{?

??? public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);?
}

該接口允許執行者比較從流中讀取的未被反序列化為對象的記錄,省去了創建對象的所有開銷。例如,IntWritable使用原始的compare()方法從每個字節數組的指定開始位置(S1和S2)和長度(L1和L2)讀取整數直接比較。

?

????WritableComparator是RawComparator對WritableComparable類的一個通用實現。第一,它提供一個默認的對原始compare()函數調用,對要比較的對象進行反序列化,然后調用對象的compare()方法。?第二,充當RawComparator實例的一個工廠方法。

??? RawComparator<IntWritable> comparator = WritableComparator.get(IntWritable.class)

?

3.2 Writable類

???

?

?

?

3.2.1 Writable的java基本類封裝

????Java基本類型????????????????????????? Writable使用????????????????????? 序列化大小(字節)

?? 布爾類型????????????????????????????????BooleanWritable??????????????? 1

???字節型?????????????????????????????????? ByteWritable????????????????????1

?? 整型?????????????????????????????????????IntWritable?????????????????????? 4

???????????????????????????????????????????? VIntWritable?????????????????????1-5

?? 浮點行?????????????????????????????????? FloatWritable??????????????????? 4

???長整型?????????????????????????????????? LongWritable??????????????????? 8

???????????????????????????????????????????? VLongWritable?????????????????? 1-9

?? 雙精度浮點型?????????????????????????? DoubleWritable????????????????? 8

?

?

4 基于文件的數據結構

?

4.1 SequenceFile類

????SequenceFile為二進制鍵值對對提供一個持久化的數據結構。

?

4.1.1 寫SequenceFile類

?? 創建SequenceFile類:SequenceFile.createWriter(....)

??

package com.laos.hadoop;?

import java.io.IOException;?
import java.net.URI;?

import org.apache.hadoop.conf.Configuration;?
import org.apache.hadoop.fs.FileSystem;?
import org.apache.hadoop.fs.Path;?
import org.apache.hadoop.io.IOUtils;?
import org.apache.hadoop.io.IntWritable;?
import org.apache.hadoop.io.SequenceFile;?
import org.apache.hadoop.io.Text;?

public class SequenceFileWriteDemo {?
????private static final String[] DATA = { "One, two, buckle my shoe",?
????????????"Three, four, shut the door", "Five, six, pick up sticks",?
????????????"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };?

????public static void main(String[] args) throws IOException {?
????????String uri = args[0];?
????????Configuration conf = new Configuration();?
????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
????????Path path = new Path(uri);?
????????IntWritable key = new IntWritable();?
????????Text value = new Text();?
????????SequenceFile.Writer writer = null;?
????????try {?
????????????writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),?
????????????????????value.getClass());?//參數:文件系統,configuration,路徑,鍵的類型和值的類型
????????????for (int i = 0; i < 100; i++) {?
????????????????key.set(100 - i);?
????????????????value.set(DATA[i % DATA.length]);?
????????????????System.out.printf("[%s]/t%s/t%s/n", writer.getLength(), key,?
????????????????????????value);?
????????????????writer.append(key, value);?
????????????}?
????????} finally {?
????????????IOUtils.closeStream(writer);?
????????}?
????}?
}

?

4.1.2 讀取SequenceFile類

?

?

01?package com.laos.hadoop;?
02?
03?import java.io.IOException;?
04?import java.net.URI;?
05?
06?import org.apache.hadoop.conf.Configuration;?
07?import org.apache.hadoop.fs.FileSystem;?
08?import org.apache.hadoop.fs.Path;?
09?import org.apache.hadoop.io.IOUtils;?
10?import org.apache.hadoop.io.SequenceFile;?
11?import org.apache.hadoop.io.Writable;?
12?import org.apache.hadoop.util.ReflectionUtils;?
13?
14?public class SequenceFileReadDemo {?
15?????public static void main(String[] args) throws IOException {?
16?????????String uri = args[0];?
17?????????Configuration conf = new Configuration();?
18?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
19?????????Path path = new Path(uri);?
20?????????SequenceFile.Reader reader = null;?
21?????????try {?
22?????????????reader = new SequenceFile.Reader(fs, path, conf);//創建reader?
23?????????????Writable key = (Writable) ReflectionUtils.newInstance(reader?
24?????????????????????.getKeyClass(), conf); //獲取key的類型
25?????????????Writable value = (Writable) ReflectionUtils.newInstance(reader?
26?????????????????????.getValueClass(), conf);//獲取value的類型?
27?????????????long position = reader.getPosition();//獲取位置?
28?????????????while (reader.next(key, value)) {//遍歷?
29?????????????????String syncSeen = reader.syncSeen() ? "*" : "";?
30?????????????????System.out.printf("[%s%s]/t%s/t%s/n", position, syncSeen, key,?
31?????????????????????????value);?
32?????????????????position = reader.getPosition(); // beginning of next record?
33?????????????}?
34?????????} finally {?
35?????????????IOUtils.closeStream(reader);?
36?????????}?
37?????}?
38?}

???

??? 兩種方法查找文件中指定的位置,第一種是seak()方法。如果文件中指定位置不是記錄邊界,reader會在調用next方法是失敗。

??? 第二種是SequenceFile.Reader.sync(long pposition)把reader定位到下一個同步點

?

4.1.3 用命令行接口顯示序列文件

?????使用-text選項顯示文本格式的序列文件。

??? % hadoop fs -text number.seq

4.1.4 序列文件的格式

????

??? SequeceFile是Hadoop API提供的一種二進制文件支持。這種二進制文件直接將<key, value>對序列化到文件中。一般對小文件可以使用這種文件合并,即將文件名作為key,文件內容作為value序列化到大文件中。這種文件格式有以下好處:
??? 1)支持壓縮,且可定制為基于Record或Block壓縮(Block級壓縮性能較優)?
??? 2)本地化任務支持:因為文件可以被切分,因此MapReduce任務時數據的本地化情況應該是非常好的。?
??? 3)難度低:因為是Hadoop框架提供的API,業務邏輯側的修改比較簡單。
???

??? SequenceFile 是一個由二進制序列化過的key/value的字節流組成的文本存儲文件,它可以在map/reduce過程中的input/output 的format時被使用。在map/reduce過程中,map處理文件的臨時輸出就是使用SequenceFile處理過的。
??? SequenceFile分別提供了讀、寫、排序的操作類。
??? SequenceFile的操作中有三種處理方式:
??? 1)?不壓縮數據直接存儲。 //enum.NONE
??? 2) 壓縮value值不壓縮key值存儲的存儲方式。//enum.RECORD
??? 3)key/value值都壓縮的方式存儲。//enum.BLOCK

?

???

沒有壓縮和記錄壓縮的序列文件的內部結構:未壓縮和記錄壓縮的結構是一樣的,record由記錄長度、鍵長度、鍵和值(或壓縮過的值)構成。

?

?

塊壓縮的序列文件的內部結構:一個同步點內記錄筆數、壓縮鍵的長度、壓縮過的鍵值、壓縮過值的長度和壓縮值。壓縮塊一次壓縮多個記錄。塊的最小大小由屬性:io.seqfile.compress.blocksize定義。

?

?

?

?

4.2 MapFile

??? MapFile是經過排序的帶索引的SequenceFile,可以根據鍵值進行查找。

4.2.1 寫MapFile

????

01?package com.laos.hadoop;?
02?
03?import java.io.IOException;?
04?import java.net.URI;?
05?
06?import org.apache.hadoop.conf.Configuration;?
07?import org.apache.hadoop.fs.FileSystem;?
08?import org.apache.hadoop.io.IOUtils;?
09?import org.apache.hadoop.io.IntWritable;?
10?import org.apache.hadoop.io.MapFile;?
11?import org.apache.hadoop.io.Text;?
12?
13?public class MapFileWriteDemo {?
14?????private static final String[] DATA = { "One, two, buckle my shoe",?
15?????????????"Three, four, shut the door", "Five, six, pick up sticks",?
16?????????????"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };?
17?
18?????public static void main(String[] args) throws IOException {?
19?????????String uri = args[0];?
20?????????Configuration conf = new Configuration();?
21?????????FileSystem fs = FileSystem.get(URI.create(uri), conf);?
22?????????IntWritable key = new IntWritable();?
23?????????Text value = new Text();?
24?????????MapFile.Writer writer = null;?
25?????????try {?
26?????????????writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value?
27?????????????????????.getClass()); //創建MapFile?? Writer的實例
28?
29?????????????for (int i = 0; i < 1024; i++) {?
30?????????????????key.set(i + 1);?
31?????????????????value.set(DATA[i % DATA.length]);?
32?????????????????writer.append(key, value);?
33?????????????}?
34?????????} finally {?
35?????????????IOUtils.closeStream(writer);?
36?????????}?
37?????}?
38?}

??? % hadoop MapFileWriteDemo numbers.map

??? numbers.map確實是一個目錄,包含data和index兩個文件。數據文件包括所有的輸入,index文件包含一小部分鍵和鍵到data文件中偏移量的映射。索引中鍵的個數由io.map.index.interval屬性設置。

?

4.2.2 讀MapFile

??? 順序遍歷MapFile過程和讀取SequenceFile過程相似:創建一個MapFile Reader,調用next函數直到返回false。

??? public boolean next(WritableComparable key, Writable val) throws IOException

???

??? 隨機訪問:

??? public Writable get(WritableComparable key, Writable val) throws IOException

?

4.2.3 將SequenceFile轉換成MapFile

?

?????關鍵是給SequenceFile重建索引:使用MapFile的靜態方法fix()。

???

01?package com.laos.hadoop;?
02?
03?import java.net.URI;?
04?
05?import org.apache.hadoop.conf.Configuration;?
06?import org.apache.hadoop.fs.FileSystem;?
07?import org.apache.hadoop.fs.Path;?
08?import org.apache.hadoop.io.MapFile;?
09?import org.apache.hadoop.io.SequenceFile;?
10?
11?public class MapFileFixer {?
12?????public static void main(String[] args) throws Exception {?
13????? String mapUri = args[0];?
14??????
15????? Configuration conf = new Configuration();?
16??????
17????? FileSystem fs = FileSystem.get(URI.create(mapUri), conf);?
18????? Path map = new Path(mapUri);?
19????? Path mapData = new Path(map, MapFile.DATA_FILE_NAME);?
20??????
21????? // Get key and value types from data sequence file?
22????? SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);?
23????? Class keyClass = reader.getKeyClass();?
24????? Class valueClass = reader.getValueClass();?
25????? reader.close();?
26??????
27????? // Create the map file index file?
28????? long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);?
29????? System.out.printf("Created MapFile %s with %d entries/n", map, entries);?
30????? }?
31?}

?

???

???

???

?

?

總結

以上是生活随笔為你收集整理的Hadoop IO的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。