生活随笔
收集整理的這篇文章主要介紹了
HDFS CheckSum
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
程序入口
- Hadoop Branch : Hadoop-2.6.0
- 使用樣例 : hadoop dfs -checksum /tmp/README.txt
- 結(jié)果
/tmp/README.txt
MD5-of-0MD5-of-512CRC32C
00000200000000000000000017970719be16d1071635fa381b95f957
算法說(shuō)明:
“MD5-of-” + crcPerBlock + “MD5-of-” + bytesPerCRC +
getCrcType().name()
crcPerBlock : 這個(gè)沒(méi)想明白,只有對(duì)應(yīng)的文件切分的block個(gè)數(shù)大于1時(shí)才會(huì)有值,否則為0
bytesPerCRC : 每512個(gè)byte使用CRC校驗(yàn)
CRC32C : CRC32的校驗(yàn)算法
結(jié)果說(shuō)明:
00000200 00000000 00000000 17970719 be16d107 1635fa38 1b95f957
結(jié)果為28個(gè)byte,
bytesPerCRC 為int類(lèi)型,使用前4個(gè)byte,值為512;
crcPerBlock為L(zhǎng)ong類(lèi)型,使用中間8個(gè)byte,值為0;
MD5計(jì)算結(jié)果為最后16個(gè)byte
// TODO CRC32 校驗(yàn)算法
HDFS 物理存儲(chǔ)格式
-rw-r–r-- 1 wankun wheel 1366 8 3 11:52 blk_1073742101
-rw-r–r-- 1 wankun wheel 19 8 3 11:52 blk_1073742101_1277.meta
meta文件的header信息占用7個(gè)字節(jié),剩余3個(gè)chunk checkSum,每個(gè)占用4個(gè)byte,對(duì)應(yīng)于block文件的每512個(gè)字節(jié)一個(gè)checksum.
- 對(duì)應(yīng)程序入口:org.apache.hadoop.fs.FsShell
FsShell.main() -> run(),cmd為需要執(zhí)行的命令,根據(jù)cmd在commandFactory找到對(duì)應(yīng)要執(zhí)行的instance,執(zhí)行instance.run()方法。commandFactory 在FsShell.init()通過(guò)反射注冊(cè)入FsCommand中的命令。
FsCommand
public static void registerCommands(CommandFactory factory) {factory.registerCommands(AclCommands.class);factory.registerCommands(CopyCommands.class);factory.registerCommands(Count.class);factory.registerCommands(Delete.class);factory.registerCommands(Display.class);factory.registerCommands(Find.class);factory.registerCommands(FsShellPermissions.class);factory.registerCommands(FsUsage.class);factory.registerCommands(Ls.class);factory.registerCommands(Mkdir.class);factory.registerCommands(MoveCommands.class);factory.registerCommands(SetReplication.class);factory.registerCommands(Stat.class);factory.registerCommands(Tail.class);factory.registerCommands(Test.class);factory.registerCommands(Touch.class);factory.registerCommands(SnapshotCommands.class);factory.registerCommands(XAttrCommands.class);}
class Display extends FsCommand {
public static void registerCommands(CommandFactory factory) {factory.addClass(Cat.class, "-cat");factory.addClass(Text.class, "-text");factory.addClass(Checksum.class, "-checksum");}}
Checksum -> DistributedFileSystem.getFileChecksum() -> DFSClient.getFileChecksum()
checksum主邏輯
獲取對(duì)應(yīng)文件的所有block信息,連接到對(duì)應(yīng)的DN,計(jì)算block的MD5校驗(yàn)信息,根據(jù)所有block的匯總byte值進(jìn)行二次MD5計(jì)算,最后
文件對(duì)應(yīng)的locatedblocks列表,依次計(jì)算該Block MD5值
public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)throws IOException {checkOpen();Preconditions.checkArgument(length >= 0);//get block locations for the file rangeLocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,length);if (null == blockLocations) {throw new FileNotFoundException("File does not exist: " + src);}List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();final DataOutputBuffer md5out = new DataOutputBuffer();int bytesPerCRC = -1;DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;long crcPerBlock = 0;boolean refetchBlocks = false;int lastRetriedIndex = -1;// get block checksum for each blocklong remaining = length;if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) {remaining = Math.min(length, blockLocations.getFileLength());}// 文件對(duì)應(yīng)的locatedblocks列表,依次計(jì)算該Block MD5值for(int i = 0; i < locatedblocks.size() && remaining > 0; i++) {if (refetchBlocks) { // refetch to get fresh tokensblockLocations = callGetBlockLocations(namenode, src, 0, length);if (null == blockLocations) {throw new FileNotFoundException("File does not exist: " + src);}locatedblocks = blockLocations.getLocatedBlocks();refetchBlocks = false;}LocatedBlock lb = locatedblocks.get(i);final ExtendedBlock block = lb.getBlock();if (remaining < block.getNumBytes()) {block.setNumBytes(remaining);}remaining -= block.getNumBytes();final DatanodeInfo[] datanodes = lb.getLocations();//try each datanode location of the blockfinal int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;boolean done = false;// connectToDN并從DN獲取計(jì)算好的MD5值for(int j = 0; !done && j < datanodes.length; j++) {DataOutputStream out = null;DataInputStream in = null;try {//connect to a datanodeIOStreamPair pair = connectToDN(datanodes[j], timeout, lb);out = new DataOutputStream(new BufferedOutputStream(pair.out,HdfsConstants.SMALL_BUFFER_SIZE));in = new DataInputStream(pair.in);if (LOG.isDebugEnabled()) {LOG.debug("write to " + datanodes[j] + ": "+ Op.BLOCK_CHECKSUM + ", block=" + block);}// get block MD5// Sender 負(fù)責(zé)向out輸出流中發(fā)送Op操作類(lèi)型和proto參數(shù)(Op.BLOCK_CHECKSUM, proto)// Receiver 負(fù)責(zé)接收Op操作,調(diào)用 DataXceiver 進(jìn)行處理new Sender(out).blockChecksum(block, lb.getBlockToken());final BlockOpResponseProto reply =BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));if (reply.getStatus() != Status.SUCCESS) {if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN) {throw new InvalidBlockTokenException();} else {throw new IOException("Bad response " + reply + " for block "+ block + " from datanode " + datanodes[j]);}}OpBlockChecksumResponseProto checksumData =reply.getChecksumResponse();//read byte-per-checksumfinal int bpc = checksumData.getBytesPerCrc();if (i == 0) { //first blockbytesPerCRC = bpc;}else if (bpc != bytesPerCRC) {throw new IOException("Byte-per-checksum not matched: bpc=" + bpc+ " but bytesPerCRC=" + bytesPerCRC);}//read crc-per-blockfinal long cpb = checksumData.getCrcPerBlock();if (locatedblocks.size() > 1 && i == 0) {crcPerBlock = cpb;}//read md5final MD5Hash md5 = new MD5Hash(checksumData.getMd5().toByteArray());md5.write(md5out);// read crc-typefinal DataChecksum.Type ct;if (checksumData.hasCrcType()) {ct = PBHelper.convert(checksumData.getCrcType());} else {LOG.debug("Retrieving checksum from an earlier-version DataNode: " +"inferring checksum by reading first byte");ct = inferChecksumTypeByReading(lb, datanodes[j]);}if (i == 0) { // first blockcrcType = ct;} else if (crcType != DataChecksum.Type.MIXED&& crcType != ct) {// if crc types are mixed in a filecrcType = DataChecksum.Type.MIXED;}done = true;if (LOG.isDebugEnabled()) {if (i == 0) {LOG.debug("set bytesPerCRC=" + bytesPerCRC+ ", crcPerBlock=" + crcPerBlock);}LOG.debug("got reply from " + datanodes[j] + ": md5=" + md5);}} catch (InvalidBlockTokenException ibte) {if (i > lastRetriedIndex) {if (LOG.isDebugEnabled()) {LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "+ "for file " + src + " for block " + block+ " from datanode " + datanodes[j]+ ". Will retry the block once.");}lastRetriedIndex = i;done = true; // actually it's not done; but we'll retryi--; // repeat at i-th blockrefetchBlocks = true;break;}} catch (IOException ie) {LOG.warn("src=" + src + ", datanodes["+j+"]=" + datanodes[j], ie);} finally {IOUtils.closeStream(in);IOUtils.closeStream(out);}}if (!done) {throw new IOException("Fail to get block MD5 for " + block);}}//compute file MD5// MD5加密:根據(jù)每個(gè)Block MD5值的數(shù)組data序列字節(jié)更新摘要,生產(chǎn)二次MD5值final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); switch (crcType) {case CRC32:return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,crcPerBlock, fileMD5);case CRC32C:// CRC32C 一種基于Intel硬件指令,加算計(jì)算的CRC32算法// https://issues.apache.org/jira/browse/HADOOP-7443return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,crcPerBlock, fileMD5);default:// If there is no block allocated for the file,// return one with the magic entry that matches what previous// hdfs versions return.if (locatedblocks.size() == 0) {return new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5);}// we should never get here since the validity was checked// when getCrcType() was called above.return null;}}
DataXceiver 計(jì)算Block Checksum
讀取Block對(duì)應(yīng)的meta文件內(nèi)容,并計(jì)算其MD5值
- 如果是整個(gè)Block校驗(yàn),直接使用全部meta文件內(nèi)容計(jì)算其MD5值
- 如果是部分Block校驗(yàn),
@Overridepublic void blockChecksum(final ExtendedBlock block,final Token<BlockTokenIdentifier> blockToken) throws IOException {final DataOutputStream out = new DataOutputStream(getOutputStream());checkAccess(out, true, block, blockToken,Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);// client side now can specify a range of the block for checksumlong requestLength = block.getNumBytes();Preconditions.checkArgument(requestLength >= 0);long visibleLength = datanode.data.getReplicaVisibleLength(block);boolean partialBlk = requestLength < visibleLength;updateCurrentThreadName("Reading metadata for block " + block);// 每一個(gè)Block都有一個(gè)對(duì)應(yīng)的meta文件,metadataIn是meta文件的內(nèi)容// eg. blk_1073742101 blk_1073742101_1277.metafinal LengthInputStream metadataIn = datanode.data.getMetaDataInputStream(block);final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));updateCurrentThreadName("Getting checksum for block " + block);try {//read metadata file// header content:// short version;// byte type; // CRC32 or CRC32C 這兩種類(lèi)型ChecksumSize = 4// int bpc; // bytePerCRC 應(yīng)該是默認(rèn)512,即每512個(gè)byte進(jìn)行校驗(yàn),校驗(yàn)結(jié)果占用4個(gè)bytefinal BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);final DataChecksum checksum = header.getChecksum();final int csize = checksum.getChecksumSize();final int bytesPerCRC = checksum.getBytesPerChecksum();// metadata的大小(去除header 2+1+4 個(gè)byte)/ 校驗(yàn)chunk checkSum(4)的大小,得到chunk checkSum個(gè)數(shù)final long crcPerBlock = csize <= 0 ? 0 : (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;// 如果是部分Block校驗(yàn),【--計(jì)算部分Block內(nèi)容的MD5值--】// 整個(gè)Block校驗(yàn),直接使用MD5加密meta內(nèi)容final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? calcPartialBlockChecksum(block, requestLength, checksum, checksumIn): MD5Hash.digest(checksumIn);if (LOG.isDebugEnabled()) {LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);}//write replyBlockOpResponseProto.newBuilder().setStatus(SUCCESS).setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() .setBytesPerCrc(bytesPerCRC).setCrcPerBlock(crcPerBlock).setMd5(ByteString.copyFrom(md5.getDigest())).setCrcType(PBHelper.convert(checksum.getChecksumType()))).build().writeDelimitedTo(out);out.flush();} catch (IOException ioe) {LOG.info("blockChecksum " + block + " received exception " + ioe);incrDatanodeNetworkErrors();throw ioe;} finally {IOUtils.closeStream(out);IOUtils.closeStream(checksumIn);IOUtils.closeStream(metadataIn);}//update metricsdatanode.metrics.addBlockChecksumOp(elapsed());}
Block meta文件寫(xiě)入
參考:http://shiyanjun.cn/archives/942.html
總結(jié)
以上是生活随笔為你收集整理的HDFS CheckSum的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。