日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

hadoop文件系统与I/O流

發布時間:2024/1/17 windows 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop文件系统与I/O流 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文地址:http://www.cnblogs.com/archimedes/p/hadoop-filesystem-io.html,轉載請注明源地址。

hadoop借鑒了Linux虛擬文件系統的概念,引入了hadoop抽象文件系統,并在此基礎上,提供了大量的具體文件系統的實現,滿足構建于hadoop上應用的各種數據訪問需求

hadoop文件系統API

hadoop提供一個抽象的文件系統,HDFS只是這個抽象文件系統的一個具體的實現。hadoop文件系統的抽象類org.apache.hadoop.fs.FileSystem

hadoop抽象文件系統的方法可以分為兩部分:

1、用于處理文件和目錄的相關事務

2、用于讀寫文件數據

hadoop抽象文件系統的操作

HadoopFileSystem

Java操作

Linux操作

描述

URL.openSteam

FileSystem.open

FileSystem.create

FileSystem.append

URL.openStream

open

打開一個文件

FSDataInputStream.read

InputSteam.read

read

讀取文件中的數據

FSDataOutputStream.write

OutputSteam.write

write

向文件寫入數據

FSDataInputStream.close

FSDataOutputStream.close

InputSteam.close

OutputSteam.close

close

關閉一個文件

FSDataInputStream.seek

RandomAccessFile.seek

lseek

改變文件讀寫位置

FileSystem.getFileStatus

FileSystem.get*

File.get*

stat

獲取文件/目錄的屬性

FileSystem.set*

File.set*

Chmod等

改變文件的屬性

FileSystem.createNewFile

File.createNewFile

create

創建一個文件

FileSystem.delete

File.delete

remove

從文件系統中刪除一個文件

FileSystem.rename

File.renameTo

rename

更改文件/目錄名

FileSystem.mkdirs

File.mkdir

mkdir

在給定目錄下創建一個子目錄

FileSystem.delete

File.delete

rmdir

從一個目錄中刪除一個空的子目錄

FileSystem.listStatus

File.list

readdir

讀取一個目錄下的項目

FileSystem.getWorkingDirectory

?

getcwd/getwd

返回當前工作目錄

FileSystem.setWorkingDirectory

?

chdir

更改當前工作目錄

通過FileSystem.getFileStatus()方法,Hadoop抽象文件系統可以一次獲得文件/目錄的所有屬性,這些屬性被保存在類FileStatus中

public class FileStatus implements Writable, Comparable {private Path path; //文件路徑private long length; //文件長度private boolean isdir; //是否是目錄private short block_replication; //副本數(為HDFS而準的特殊參數)private long blocksize; //塊大小(為HDFS而準的特殊參數)private long modification_time; //最后修改時間private long access_time; //最后訪問時間private FsPermission permission; //許可信息private String owner; //文件所有者private String group; //用戶組 …… }

FileStatus實現了Writable接口,也就是說,FileStatus可以被序列化后在網絡上傳輸,同時一次性將文件的所有屬性讀出并返回到客戶端,可以減少在分布式系統中進行網絡傳輸的次數

完整的FileStatus類的源代碼如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable;/** Interface that represents the client side information for a file.*/ public class FileStatus implements Writable, Comparable {private Path path;private long length;private boolean isdir;private short block_replication;private long blocksize;private long modification_time;private long access_time;private FsPermission permission;private String owner;private String group;public FileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }//We should deprecate this soon?public FileStatus(long length, boolean isdir, int block_replication,long blocksize, long modification_time, Path path) {this(length, isdir, block_replication, blocksize, modification_time,0, null, null, null, path);}public FileStatus(long length, boolean isdir, int block_replication,long blocksize, long modification_time, long access_time,FsPermission permission, String owner, String group, Path path) {this.length = length;this.isdir = isdir;this.block_replication = (short)block_replication;this.blocksize = blocksize;this.modification_time = modification_time;this.access_time = access_time;this.permission = (permission == null) ? FsPermission.getDefault() : permission;this.owner = (owner == null) ? "" : owner;this.group = (group == null) ? "" : group;this.path = path;}/* * @return the length of this file, in blocks*/public long getLen() {return length;}/*** Is this a directory?* @return true if this is a directory*/public boolean isDir() {return isdir;}/*** Get the block size of the file.* @return the number of bytes*/public long getBlockSize() {return blocksize;}/*** Get the replication factor of a file.* @return the replication factor of a file.*/public short getReplication() {return block_replication;}/*** Get the modification time of the file.* @return the modification time of file in milliseconds since January 1, 1970 UTC.*/public long getModificationTime() {return modification_time;}/*** Get the access time of the file.* @return the access time of file in milliseconds since January 1, 1970 UTC.*/public long getAccessTime() {return access_time;}/*** Get FsPermission associated with the file.* @return permssion. If a filesystem does not have a notion of permissions* or if permissions could not be determined, then default * permissions equivalent of "rwxrwxrwx" is returned.*/public FsPermission getPermission() {return permission;}/*** Get the owner of the file.* @return owner of the file. The string could be empty if there is no* notion of owner of a file in a filesystem or if it could not * be determined (rare).*/public String getOwner() {return owner;}/*** Get the group associated with the file.* @return group for the file. The string could be empty if there is no* notion of group of a file in a filesystem or if it could not * be determined (rare).*/public String getGroup() {return group;}public Path getPath() {return path;}/* These are provided so that these values could be loaded lazily * by a filesystem (e.g. local file system).*//*** Sets permission.* @param permission if permission is null, default value is set*/protected void setPermission(FsPermission permission) {this.permission = (permission == null) ? FsPermission.getDefault() : permission;}/*** Sets owner.* @param owner if it is null, default value is set*/ protected void setOwner(String owner) {this.owner = (owner == null) ? "" : owner;}/*** Sets group.* @param group if it is null, default value is set*/ protected void setGroup(String group) {this.group = (group == null) ? "" : group;}//// Writable// public void write(DataOutput out) throws IOException {Text.writeString(out, getPath().toString());out.writeLong(length);out.writeBoolean(isdir);out.writeShort(block_replication);out.writeLong(blocksize);out.writeLong(modification_time);out.writeLong(access_time);permission.write(out);Text.writeString(out, owner);Text.writeString(out, group);}public void readFields(DataInput in) throws IOException {String strPath = Text.readString(in);this.path = new Path(strPath);this.length = in.readLong();this.isdir = in.readBoolean();this.block_replication = in.readShort();blocksize = in.readLong();modification_time = in.readLong();access_time = in.readLong();permission.readFields(in);owner = Text.readString(in);group = Text.readString(in);}/*** Compare this object to another object* * @param o the object to be compared.* @return a negative integer, zero, or a positive integer as this object* is less than, equal to, or greater than the specified object.* * @throws ClassCastException if the specified object's is not of * type FileStatus*/public int compareTo(Object o) {FileStatus other = (FileStatus)o;return this.getPath().compareTo(other.getPath());}/** Compare if this object is equal to another object* @param o the object to be compared.* @return true if two file status has the same path name; false if not.*/public boolean equals(Object o) {if (o == null) {return false;}if (this == o) {return true;}if (!(o instanceof FileStatus)) {return false;}FileStatus other = (FileStatus)o;return this.getPath().equals(other.getPath());}/*** Returns a hash code value for the object, which is defined as* the hash code of the path name.** @return a hash code value for the path name.*/public int hashCode() {return getPath().hashCode();} } FileStatus

出現在FileSystem中的,但在java文件API中找不到對應的方法有:setReplication()、getReplication()、getContentSummary(),其聲明如下:

public boolean setReplication(Path src, short replication)throws IOException {return true; } public short getReplication(Path src) throws IOException {return getFileStatus(src).getReplication(); } public ContentSummary getContentSummary(Path f) throws IOException {FileStatus status = getFileStatus(f);if (!status.isDir()) {// f is a filereturn new ContentSummary(status.getLen(), 1, 0);}// f is a directorylong[] summary = {0, 0, 1};for(FileStatus s : listStatus(f)) {ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :new ContentSummary(s.getLen(), 1, 0);summary[0] += c.getLength();summary[1] += c.getFileCount();summary[2] += c.getDirectoryCount();}return new ContentSummary(summary[0], summary[1], summary[2]); }

實現一個Hadoop具體文件系統,需要實現的功能有哪些?下面整理org.apache.hadoop.fs.FileSystem中的抽象方法:

//獲取文件系統URI public abstract URI getUri();//為讀打開一個文件,并返回一個輸入流 public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;//創建一個文件,并返回一個輸出流 public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;//在一個已經存在的文件中追加數據 public abstract FSDataOutputStream append(Path f, int bufferSize,Progressable progress) throws IOException;//修改文件名或目錄名 public abstract boolean rename(Path src, Path dst) throws IOException;//刪除文件 public abstract boolean delete(Path f) throws IOException; public abstract boolean delete(Path f, boolean recursive) throws IOException;//如果Path是一個目錄,讀取一個目錄下的所有項目和項目屬性 //如果Path是一個文件,獲取文件屬性 public abstract FileStatus[] listStatus(Path f) throws IOException;//設置當前的工作目錄 public abstract void setWorkingDirectory(Path new_dir);//獲取當前的工作目錄 public abstract Path getWorkingDirectory();//如果Path是一個文件,獲取文件屬性 public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;//獲取文件或目錄的屬性 public abstract FileStatus getFileStatus(Path f) throws IOException;

實現一個具體的文件系統,至少需要實現上面的這些抽象方法

hadoop完整的FileSystem類的源代碼如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern;import javax.security.auth.login.LoginException;import org.apache.commons.logging.*;import org.apache.hadoop.conf.*; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.UserGroupInformation;/***************************************************************** An abstract base class for a fairly generic filesystem. It* may be implemented as a distributed filesystem, or as a "local"* one that reflects the locally-connected disk. The local version* exists for small Hadoop instances and for testing.** <p>** All user code that may potentially use the Hadoop Distributed* File System should be written to use a FileSystem object. The* Hadoop DFS is a multi-machine system that appears as a single* disk. It's useful because of its fault tolerance and potentially* very large capacity.* * <p>* The local implementation is {@link LocalFileSystem} and distributed* implementation is DistributedFileSystem.*****************************************************************/ public abstract class FileSystem extends Configured implements Closeable {private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";public static final Log LOG = LogFactory.getLog(FileSystem.class);/** FileSystem cache */private static final Cache CACHE = new Cache();/** The key this instance is stored under in the cache. */private Cache.Key key;/** Recording statistics per a FileSystem class */private static final Map<Class<? extends FileSystem>, Statistics> statisticsTable =new IdentityHashMap<Class<? extends FileSystem>, Statistics>();/*** The statistics for this file system.*/protected Statistics statistics;/*** A cache of files that should be deleted when filsystem is closed* or the JVM is exited.*/private Set<Path> deleteOnExit = new TreeSet<Path>();/** Returns the configured filesystem implementation.*/public static FileSystem get(Configuration conf) throws IOException {return get(getDefaultUri(conf), conf);}/** Get the default filesystem URI from a configuration.* @param conf the configuration to access* @return the uri of the default filesystem*/public static URI getDefaultUri(Configuration conf) {return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));}/** Set the default filesystem URI in a configuration.* @param conf the configuration to alter* @param uri the new default filesystem uri*/public static void setDefaultUri(Configuration conf, URI uri) {conf.set(FS_DEFAULT_NAME_KEY, uri.toString());}/** Set the default filesystem URI in a configuration.* @param conf the configuration to alter* @param uri the new default filesystem uri*/public static void setDefaultUri(Configuration conf, String uri) {setDefaultUri(conf, URI.create(fixName(uri)));}/** Called after a new FileSystem instance is constructed.* @param name a uri whose authority section names the host, port, etc.* for this FileSystem* @param conf the configuration*/public void initialize(URI name, Configuration conf) throws IOException {statistics = getStatistics(name.getScheme(), getClass()); }/** Returns a URI whose scheme and authority identify this FileSystem.*/public abstract URI getUri();/** @deprecated call #getUri() instead.*/public String getName() { return getUri().toString(); }/** @deprecated call #get(URI,Configuration) instead. */public static FileSystem getNamed(String name, Configuration conf)throws IOException {return get(URI.create(fixName(name)), conf);}/** Update old-format filesystem names, for back-compatibility. This should* eventually be replaced with a checkName() method that throws an exception* for old-format names. */ private static String fixName(String name) {// convert old-format name to new-format nameif (name.equals("local")) { // "local" is now "file:///".LOG.warn("\"local\" is a deprecated filesystem name."+" Use \"file:///\" instead.");name = "file:///";} else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"LOG.warn("\""+name+"\" is a deprecated filesystem name."+" Use \"hdfs://"+name+"/\" instead.");name = "hdfs://"+name;}return name;}/*** Get the local file syste* @param conf the configuration to configure the file system with* @return a LocalFileSystem*/public static LocalFileSystem getLocal(Configuration conf)throws IOException {return (LocalFileSystem)get(LocalFileSystem.NAME, conf);}/** Returns the FileSystem for this URI's scheme and authority. The scheme* of the URI determines a configuration property name,* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.* The entire URI is passed to the FileSystem instance's initialize method.*/public static FileSystem get(URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme();String authority = uri.getAuthority();if (scheme == null) { // no scheme: use default FSreturn get(conf);}if (authority == null) { // no authorityURI defaultUri = getDefaultUri(conf);if (scheme.equals(defaultUri.getScheme()) // if scheme matches default&& defaultUri.getAuthority() != null) { // & default has authorityreturn get(defaultUri, conf); // return default }}String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);if (conf.getBoolean(disableCacheName, false)) {return createFileSystem(uri, conf);}return CACHE.get(uri, conf);}private static class ClientFinalizer extends Thread {public synchronized void run() {try {FileSystem.closeAll();} catch (IOException e) {LOG.info("FileSystem.closeAll() threw an exception:\n" + e);}}}private static final ClientFinalizer clientFinalizer = new ClientFinalizer();/*** Close all cached filesystems. Be sure those filesystems are not* used anymore.* * @throws IOException*/public static void closeAll() throws IOException {CACHE.closeAll();}/** Make sure that a path specifies a FileSystem. */public Path makeQualified(Path path) {checkPath(path);return path.makeQualified(this);}/** create a file with the provided permission* The permission of the file is set to be the provided permission as in* setPermission, not permission&~umask* * It is implemented using two RPCs. It is understood that it is inefficient,* but the implementation is thread-safe. The other option is to change the* value of umask in configuration to be 0, but it is not thread-safe.* * @param fs file system handle* @param file the name of the file to be created* @param permission the permission of the file* @return an output stream* @throws IOException*/public static FSDataOutputStream create(FileSystem fs,Path file, FsPermission permission) throws IOException {// create the file with default permissionFSDataOutputStream out = fs.create(file);// set its permission to the supplied one fs.setPermission(file, permission);return out;}/** create a directory with the provided permission* The permission of the directory is set to be the provided permission as in* setPermission, not permission&~umask* * @see #create(FileSystem, Path, FsPermission)* * @param fs file system handle* @param dir the name of the directory to be created* @param permission the permission of the directory* @return true if the directory creation succeeds; false otherwise* @throws IOException*/public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)throws IOException {// create the directory using the default permissionboolean result = fs.mkdirs(dir);// set its permission to be the supplied one fs.setPermission(dir, permission);return result;}///// FileSystem///protected FileSystem() {super(null);}/** Check that a Path belongs to this FileSystem. */protected void checkPath(Path path) {URI uri = path.toUri();if (uri.getScheme() == null) // fs is relative return;String thisScheme = this.getUri().getScheme();String thatScheme = uri.getScheme();String thisAuthority = this.getUri().getAuthority();String thatAuthority = uri.getAuthority();//authority and scheme are not case sensitiveif (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes matchif (thisAuthority == thatAuthority || // & authorities match(thisAuthority != null && thisAuthority.equalsIgnoreCase(thatAuthority)))return;if (thatAuthority == null && // path's authority is nullthisAuthority != null) { // fs has an authorityURI defaultUri = getDefaultUri(getConf()); // & is the conf default if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))return;try { // or the default fs's uridefaultUri = get(getConf()).getUri();} catch (IOException e) {throw new RuntimeException(e);}if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))return;}}throw new IllegalArgumentException("Wrong FS: "+path+", expected: "+this.getUri());}/*** Return an array containing hostnames, offset and size of * portions of the given file. For a nonexistent * file or regions, null will be returned.** This call is most helpful with DFS, where it returns * hostnames of machines that contain the given file.** The FileSystem will simply return an elt containing 'localhost'.*/public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {if (file == null) {return null;}if ( (start<0) || (len < 0) ) {throw new IllegalArgumentException("Invalid start or len parameter");}if (file.getLen() < start) {return new BlockLocation[0];}String[] name = { "localhost:50010" };String[] host = { "localhost" };return new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) };}/*** Opens an FSDataInputStream at the indicated Path.* @param f the file name to open* @param bufferSize the size of the buffer to be used.*/public abstract FSDataInputStream open(Path f, int bufferSize)throws IOException;/*** Opens an FSDataInputStream at the indicated Path.* @param f the file to open*/public FSDataInputStream open(Path f) throws IOException {return open(f, getConf().getInt("io.file.buffer.size", 4096));}/*** Opens an FSDataOutputStream at the indicated Path.* Files are overwritten by default.*/public FSDataOutputStream create(Path f) throws IOException {return create(f, true);}/*** Opens an FSDataOutputStream at the indicated Path.*/public FSDataOutputStream create(Path f, boolean overwrite)throws IOException {return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096),getDefaultReplication(),getDefaultBlockSize());}/*** Create an FSDataOutputStream at the indicated Path with write-progress* reporting.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, Progressable progress) throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),getDefaultReplication(),getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, short replication)throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),replication,getDefaultBlockSize());}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* Files are overwritten by default.*/public FSDataOutputStream create(Path f, short replication, Progressable progress)throws IOException {return create(f, true, getConf().getInt("io.file.buffer.size", 4096),replication,getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.*/public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize) throws IOException {return create(f, overwrite, bufferSize, getDefaultReplication(),getDefaultBlockSize());}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.*/public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize,Progressable progress) throws IOException {return create(f, overwrite, bufferSize, getDefaultReplication(),getDefaultBlockSize(), progress);}/*** Opens an FSDataOutputStream at the indicated Path.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file. */public FSDataOutputStream create(Path f, boolean overwrite,int bufferSize,short replication,long blockSize) throws IOException {return create(f, overwrite, bufferSize, replication, blockSize, null);}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file. */public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException {return this.create(f, FsPermission.getDefault(),overwrite, bufferSize, replication, blockSize, progress);}/*** Opens an FSDataOutputStream at the indicated Path with write-progress* reporting.* @param f the file name to open* @param permission* @param overwrite if a file with this name already exists, then if true,* the file will be overwritten, and if false an error will be thrown.* @param bufferSize the size of the buffer to be used.* @param replication required block replication for the file.* @param blockSize* @param progress* @throws IOException* @see #setPermission(Path, FsPermission)*/public abstract FSDataOutputStream create(Path f,FsPermission permission,boolean overwrite,int bufferSize,short replication,long blockSize,Progressable progress) throws IOException;/*** Creates the given Path as a brand-new zero-length file. If* create fails, or if it already existed, return false.*/public boolean createNewFile(Path f) throws IOException {if (exists(f)) {return false;} else {create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();return true;}}/*** Append to an existing file (optional operation).* Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)* @param f the existing file to be appended.* @throws IOException*/public FSDataOutputStream append(Path f) throws IOException {return append(f, getConf().getInt("io.file.buffer.size", 4096), null);}/*** Append to an existing file (optional operation).* Same as append(f, bufferSize, null).* @param f the existing file to be appended.* @param bufferSize the size of the buffer to be used.* @throws IOException*/public FSDataOutputStream append(Path f, int bufferSize) throws IOException {return append(f, bufferSize, null);}/*** Append to an existing file (optional operation).* @param f the existing file to be appended.* @param bufferSize the size of the buffer to be used.* @param progress for reporting progress if it is not null.* @throws IOException*/public abstract FSDataOutputStream append(Path f, int bufferSize,Progressable progress) throws IOException;/*** Get replication.* * @deprecated Use getFileStatus() instead* @param src file name* @return file replication* @throws IOException*/ @Deprecatedpublic short getReplication(Path src) throws IOException {return getFileStatus(src).getReplication();}/*** Set replication for an existing file.* * @param src file name* @param replication new replication* @throws IOException* @return true if successful;* false if file does not exist or is a directory*/public boolean setReplication(Path src, short replication)throws IOException {return true;}/*** Renames Path src to Path dst. Can take place on local fs* or remote DFS.*/public abstract boolean rename(Path src, Path dst) throws IOException;/** Delete a file. *//** @deprecated Use delete(Path, boolean) instead */ @Deprecated public abstract boolean delete(Path f) throws IOException;/** Delete a file.** @param f the path to delete.* @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In* case of a file the recursive can be set to either true or false. * @return true if delete is successful else false. * @throws IOException*/public abstract boolean delete(Path f, boolean recursive) throws IOException;/*** Mark a path to be deleted when FileSystem is closed.* When the JVM shuts down,* all FileSystem objects will be closed automatically.* Then,* the marked path will be deleted as a result of closing the FileSystem.** The path has to exist in the file system.* * @param f the path to delete.* @return true if deleteOnExit is successful, otherwise false.* @throws IOException*/public boolean deleteOnExit(Path f) throws IOException {if (!exists(f)) {return false;}synchronized (deleteOnExit) {deleteOnExit.add(f);}return true;}/*** Delete all files that were marked as delete-on-exit. This recursively* deletes all files in the specified paths.*/protected void processDeleteOnExit() {synchronized (deleteOnExit) {for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {Path path = iter.next();try {delete(path, true);}catch (IOException e) {LOG.info("Ignoring failure to deleteOnExit for path " + path);}iter.remove();}}}/** Check if exists.* @param f source file*/public boolean exists(Path f) throws IOException {try {return getFileStatus(f) != null;} catch (FileNotFoundException e) {return false;}}/** True iff the named path is a directory. *//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic boolean isDirectory(Path f) throws IOException {try {return getFileStatus(f).isDir();} catch (FileNotFoundException e) {return false; // f does not exist }}/** True iff the named path is a regular file. */public boolean isFile(Path f) throws IOException {try {return !getFileStatus(f).isDir();} catch (FileNotFoundException e) {return false; // f does not exist }}/** The number of bytes in a file. *//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic long getLength(Path f) throws IOException {return getFileStatus(f).getLen();}/** Return the {@link ContentSummary} of a given {@link Path}. */public ContentSummary getContentSummary(Path f) throws IOException {FileStatus status = getFileStatus(f);if (!status.isDir()) {// f is a filereturn new ContentSummary(status.getLen(), 1, 0);}// f is a directorylong[] summary = {0, 0, 1};for(FileStatus s : listStatus(f)) {ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :new ContentSummary(s.getLen(), 1, 0);summary[0] += c.getLength();summary[1] += c.getFileCount();summary[2] += c.getDirectoryCount();}return new ContentSummary(summary[0], summary[1], summary[2]);}final private static PathFilter DEFAULT_FILTER = new PathFilter() {public boolean accept(Path file) {return true;} };/*** List the statuses of the files/directories in the given path if the path is* a directory.* * @param f* given path* @return the statuses of the files/directories in the given patch* @throws IOException*/public abstract FileStatus[] listStatus(Path f) throws IOException;/** Filter files/directories in the given path using the user-supplied path* filter. Results are added to the given array <code>results</code>.*/private void listStatus(ArrayList<FileStatus> results, Path f,PathFilter filter) throws IOException {FileStatus listing[] = listStatus(f);if (listing != null) {for (int i = 0; i < listing.length; i++) {if (filter.accept(listing[i].getPath())) {results.add(listing[i]);}}}}/*** Filter files/directories in the given path using the user-supplied path* filter.* * @param f* a path name* @param filter* the user-supplied path filter* @return an array of FileStatus objects for the files under the given path* after applying the filter* @throws IOException* if encounter any problem while fetching the status*/public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {ArrayList<FileStatus> results = new ArrayList<FileStatus>();listStatus(results, f, filter);return results.toArray(new FileStatus[results.size()]);}/*** Filter files/directories in the given list of paths using default* path filter.* * @param files* a list of paths* @return a list of statuses for the files under the given paths after* applying the filter default Path filter* @exception IOException*/public FileStatus[] listStatus(Path[] files)throws IOException {return listStatus(files, DEFAULT_FILTER);}/*** Filter files/directories in the given list of paths using user-supplied* path filter.* * @param files* a list of paths* @param filter* the user-supplied path filter* @return a list of statuses for the files under the given paths after* applying the filter* @exception IOException*/public FileStatus[] listStatus(Path[] files, PathFilter filter)throws IOException {ArrayList<FileStatus> results = new ArrayList<FileStatus>();for (int i = 0; i < files.length; i++) {listStatus(results, files[i], filter);}return results.toArray(new FileStatus[results.size()]);}/*** <p>Return all the files that match filePattern and are not checksum* files. Results are sorted by their names.* * <p>* A filename pattern is composed of <i>regular</i> characters and* <i>special pattern matching</i> characters, which are:** <dl>* <dd>* <dl>* <p>* <dt> <tt> ? </tt>* <dd> Matches any single character.** <p>* <dt> <tt> * </tt>* <dd> Matches zero or more characters.** <p>* <dt> <tt> [<i>abc</i>] </tt>* <dd> Matches a single character from character set* <tt>{<i>a,b,c</i>}</tt>.** <p>* <dt> <tt> [<i>a</i>-<i>b</i>] </tt>* <dd> Matches a single character from the character range* <tt>{<i>a...b</i>}</tt>. Note that character <tt><i>a</i></tt> must be* lexicographically less than or equal to character <tt><i>b</i></tt>.** <p>* <dt> <tt> [^<i>a</i>] </tt>* <dd> Matches a single character that is not from character set or range* <tt>{<i>a</i>}</tt>. Note that the <tt>^</tt> character must occur* immediately to the right of the opening bracket.** <p>* <dt> <tt> \<i>c</i> </tt>* <dd> Removes (escapes) any special meaning of character <i>c</i>.** <p>* <dt> <tt> {ab,cd} </tt>* <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>* * <p>* <dt> <tt> {ab,c{de,fh}} </tt>* <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>** </dl>* </dd>* </dl>** @param pathPattern a regular expression specifying a pth pattern* @return an array of paths that match the path pattern* @throws IOException*/public FileStatus[] globStatus(Path pathPattern) throws IOException {return globStatus(pathPattern, DEFAULT_FILTER);}/*** Return an array of FileStatus objects whose path names match pathPattern* and is accepted by the user-supplied path filter. Results are sorted by* their path names.* Return null if pathPattern has no glob and the path does not exist.* Return an empty array if pathPattern has a glob and no path matches it. * * @param pathPattern* a regular expression specifying the path pattern* @param filter* a user-supplied path filter* @return an array of FileStatus objects* @throws IOException if any I/O error occurs when fetching file status*/public FileStatus[] globStatus(Path pathPattern, PathFilter filter)throws IOException {String filename = pathPattern.toUri().getPath();List<String> filePatterns = GlobExpander.expand(filename);if (filePatterns.size() == 1) {return globStatusInternal(pathPattern, filter);} else {List<FileStatus> results = new ArrayList<FileStatus>();for (String filePattern : filePatterns) {FileStatus[] files = globStatusInternal(new Path(filePattern), filter);for (FileStatus file : files) {results.add(file);}}return results.toArray(new FileStatus[results.size()]);}}private FileStatus[] globStatusInternal(Path pathPattern, PathFilter filter)throws IOException {Path[] parents = new Path[1];int level = 0;String filename = pathPattern.toUri().getPath();// path has only zero componentif ("".equals(filename) || Path.SEPARATOR.equals(filename)) {return getFileStatus(new Path[]{pathPattern});}// path has at least one componentString[] components = filename.split(Path.SEPARATOR);// get the first componentif (pathPattern.isAbsolute()) {parents[0] = new Path(Path.SEPARATOR);level = 1;} else {parents[0] = new Path(Path.CUR_DIR);}// glob the paths that match the parent path, i.e., [0, components.length-1]boolean[] hasGlob = new boolean[]{false};Path[] parentPaths = globPathsLevel(parents, components, level, hasGlob);FileStatus[] results;if (parentPaths == null || parentPaths.length == 0) {results = null;} else {// Now work on the last component of the pathGlobFilter fp = new GlobFilter(components[components.length - 1], filter);if (fp.hasPattern()) { // last component has a pattern// list parent directories and then glob the resultsresults = listStatus(parentPaths, fp);hasGlob[0] = true;} else { // last component does not have a pattern// get all the path namesArrayList<Path> filteredPaths = new ArrayList<Path>(parentPaths.length);for (int i = 0; i < parentPaths.length; i++) {parentPaths[i] = new Path(parentPaths[i],components[components.length - 1]);if (fp.accept(parentPaths[i])) {filteredPaths.add(parentPaths[i]);}}// get all their statusesresults = getFileStatus(filteredPaths.toArray(new Path[filteredPaths.size()]));}}// Decide if the pathPattern contains a glob or notif (results == null) {if (hasGlob[0]) {results = new FileStatus[0];}} else {if (results.length == 0 ) {if (!hasGlob[0]) {results = null;}} else {Arrays.sort(results);}}return results;}/** For a path of N components, return a list of paths that match the* components [<code>level</code>, <code>N-1</code>].*/private Path[] globPathsLevel(Path[] parents, String[] filePattern,int level, boolean[] hasGlob) throws IOException {if (level == filePattern.length - 1)return parents;if (parents == null || parents.length == 0) {return null;}GlobFilter fp = new GlobFilter(filePattern[level]);if (fp.hasPattern()) {parents = FileUtil.stat2Paths(listStatus(parents, fp));hasGlob[0] = true;} else {for (int i = 0; i < parents.length; i++) {parents[i] = new Path(parents[i], filePattern[level]);}}return globPathsLevel(parents, filePattern, level + 1, hasGlob);}/* A class that could decide if a string matches the glob or not */private static class GlobFilter implements PathFilter {private PathFilter userFilter = DEFAULT_FILTER;private Pattern regex;private boolean hasPattern = false;/** Default pattern character: Escape any special meaning. */private static final char PAT_ESCAPE = '\\';/** Default pattern character: Any single character. */private static final char PAT_ANY = '.';/** Default pattern character: Character set close. */private static final char PAT_SET_CLOSE = ']';GlobFilter() {}GlobFilter(String filePattern) throws IOException {setRegex(filePattern);}GlobFilter(String filePattern, PathFilter filter) throws IOException {userFilter = filter;setRegex(filePattern);}private boolean isJavaRegexSpecialChar(char pChar) {return pChar == '.' || pChar == '$' || pChar == '(' || pChar == ')' ||pChar == '|' || pChar == '+';}void setRegex(String filePattern) throws IOException {int len;int setOpen;int curlyOpen;boolean setRange;StringBuilder fileRegex = new StringBuilder();// Validate the patternlen = filePattern.length();if (len == 0)return;setOpen = 0;setRange = false;curlyOpen = 0;for (int i = 0; i < len; i++) {char pCh;// Examine a single pattern characterpCh = filePattern.charAt(i);if (pCh == PAT_ESCAPE) {fileRegex.append(pCh);i++;if (i >= len)error("An escaped character does not present", filePattern, i);pCh = filePattern.charAt(i);} else if (isJavaRegexSpecialChar(pCh)) {fileRegex.append(PAT_ESCAPE);} else if (pCh == '*') {fileRegex.append(PAT_ANY);hasPattern = true;} else if (pCh == '?') {pCh = PAT_ANY;hasPattern = true;} else if (pCh == '{') {fileRegex.append('(');pCh = '(';curlyOpen++;hasPattern = true;} else if (pCh == ',' && curlyOpen > 0) {fileRegex.append(")|");pCh = '(';} else if (pCh == '}' && curlyOpen > 0) {// End of a groupcurlyOpen--;fileRegex.append(")");pCh = ')';} else if (pCh == '[' && setOpen == 0) {setOpen++;hasPattern = true;} else if (pCh == '^' && setOpen > 0) {} else if (pCh == '-' && setOpen > 0) {// Character set rangesetRange = true;} else if (pCh == PAT_SET_CLOSE && setRange) {// Incomplete character set rangeerror("Incomplete character set range", filePattern, i);} else if (pCh == PAT_SET_CLOSE && setOpen > 0) {// End of a character setif (setOpen < 2)error("Unexpected end of set", filePattern, i);setOpen = 0;} else if (setOpen > 0) {// Normal character, or the end of a character set rangesetOpen++;setRange = false;}fileRegex.append(pCh);}// Check for a well-formed patternif (setOpen > 0 || setRange || curlyOpen > 0) {// Incomplete character set or character rangeerror("Expecting set closure character or end of range, or }", filePattern, len);}regex = Pattern.compile(fileRegex.toString());}boolean hasPattern() {return hasPattern;}public boolean accept(Path path) {return regex.matcher(path.getName()).matches() && userFilter.accept(path);}private void error(String s, String pattern, int pos) throws IOException {throw new IOException("Illegal file pattern: "+s+ " for glob "+ pattern + " at " + pos);}}/** Return the current user's home directory in this filesystem.* The default implementation returns "/user/$USER/".*/public Path getHomeDirectory() {return new Path("/user/"+System.getProperty("user.name")).makeQualified(this);}/*** Set the current working directory for the given file system. All relative* paths will be resolved relative to it.* * @param new_dir*/public abstract void setWorkingDirectory(Path new_dir);/*** Get the current working directory for the given file system* @return the directory pathname*/public abstract Path getWorkingDirectory();/*** Call {@link #mkdirs(Path, FsPermission)} with default permission.*/public boolean mkdirs(Path f) throws IOException {return mkdirs(f, FsPermission.getDefault());}/*** Make the given file and all non-existent parents into* directories. Has the semantics of Unix 'mkdir -p'.* Existence of the directory hierarchy is not an error.*/public abstract boolean mkdirs(Path f, FsPermission permission) throws IOException;/*** The src file is on the local disk. Add it to FS at* the given dst name and the source is kept intact afterwards*/public void copyFromLocalFile(Path src, Path dst)throws IOException {copyFromLocalFile(false, src, dst);}/*** The src files is on the local disk. Add it to FS at* the given dst name, removing the source afterwards.*/public void moveFromLocalFile(Path[] srcs, Path dst)throws IOException {copyFromLocalFile(true, true, srcs, dst);}/*** The src file is on the local disk. Add it to FS at* the given dst name, removing the source afterwards.*/public void moveFromLocalFile(Path src, Path dst)throws IOException {copyFromLocalFile(true, src, dst);}/*** The src file is on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, Path src, Path dst)throws IOException {copyFromLocalFile(delSrc, true, src, dst);}/*** The src files are on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)throws IOException {Configuration conf = getConf();FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);}/*** The src file is on the local disk. Add it to FS at* the given dst name.* delSrc indicates if the source should be removed*/public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)throws IOException {Configuration conf = getConf();FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.*/public void copyToLocalFile(Path src, Path dst) throws IOException {copyToLocalFile(false, src, dst);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.* Remove the source afterwards*/public void moveToLocalFile(Path src, Path dst) throws IOException {copyToLocalFile(true, src, dst);}/*** The src file is under FS, and the dst is on the local disk.* Copy it from FS control to the local dst name.* delSrc indicates if the src will be removed or not.*/ public void copyToLocalFile(boolean delSrc, Path src, Path dst)throws IOException {FileUtil.copy(this, src, getLocal(getConf()), dst, delSrc, getConf());}/*** Returns a local File that the user can write output to. The caller* provides both the eventual FS target name and the local working* file. If the FS is local, we write directly into the target. If* the FS is remote, we write into the tmp local area.*/public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)throws IOException {return tmpLocalFile;}/*** Called when we're all done writing to the target. A local FS will* do nothing, because we've written to exactly the right place. A remote* FS will copy the contents of tmpLocalFile to the correct target at* fsOutputFile.*/public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)throws IOException {moveFromLocalFile(tmpLocalFile, fsOutputFile);}/*** No more filesystem operations are needed. Will* release any held locks.*/public void close() throws IOException {// delete all files that were marked as delete-on-exit. processDeleteOnExit();CACHE.remove(this.key, this);}/** Return the total size of all files in the filesystem.*/public long getUsed() throws IOException{long used = 0;FileStatus[] files = listStatus(new Path("/"));for(FileStatus file:files){used += file.getLen();}return used;}/*** Get the block size for a particular file.* @param f the filename* @return the number of bytes in a block*//** @deprecated Use getFileStatus() instead */ @Deprecatedpublic long getBlockSize(Path f) throws IOException {return getFileStatus(f).getBlockSize();}/** Return the number of bytes that large input files should be optimally* be split into to minimize i/o time. */public long getDefaultBlockSize() {// default to 32MB: large enough to minimize the impact of seeksreturn getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);}/*** Get the default replication.*/public short getDefaultReplication() { return 1; }/*** Return a file status object that represents the path.* @param f The path we want information from* @return a FileStatus object* @throws FileNotFoundException when the path does not exist;* IOException see specific implementation*/public abstract FileStatus getFileStatus(Path f) throws IOException;/*** Get the checksum of a file.** @param f The file path* @return The file checksum. The default return value is null,* which indicates that no checksum algorithm is implemented* in the corresponding FileSystem.*/public FileChecksum getFileChecksum(Path f) throws IOException {return null;}/*** Set the verify checksum flag. This is only applicable if the * corresponding FileSystem supports checksum. By default doesn't do anything.* @param verifyChecksum*/public void setVerifyChecksum(boolean verifyChecksum) {//doesn't do anything }/*** Return a list of file status objects that corresponds to the list of paths* excluding those non-existent paths.* * @param paths* the list of paths we want information from* @return a list of FileStatus objects* @throws IOException* see specific implementation*/private FileStatus[] getFileStatus(Path[] paths) throws IOException {if (paths == null) {return null;}ArrayList<FileStatus> results = new ArrayList<FileStatus>(paths.length);for (int i = 0; i < paths.length; i++) {try {results.add(getFileStatus(paths[i]));} catch (FileNotFoundException e) { // do nothing }}return results.toArray(new FileStatus[results.size()]);}/*** Set permission of a path.* @param p* @param permission*/public void setPermission(Path p, FsPermission permission) throws IOException {}/*** Set owner of a path (i.e. a file or a directory).* The parameters username and groupname cannot both be null.* @param p The path* @param username If it is null, the original username remains unchanged.* @param groupname If it is null, the original groupname remains unchanged.*/public void setOwner(Path p, String username, String groupname) throws IOException {}/*** Set access time of a file* @param p The path* @param mtime Set the modification time of this file.* The number of milliseconds since Jan 1, 1970. * A value of -1 means that this call should not set modification time.* @param atime Set the access time of this file.* The number of milliseconds since Jan 1, 1970. * A value of -1 means that this call should not set access time.*/public void setTimes(Path p, long mtime, long atime) throws IOException {}private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);if (clazz == null) {throw new IOException("No FileSystem for scheme: " + uri.getScheme());}FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);fs.initialize(uri, conf);return fs;}/** Caching FileSystem objects */static class Cache {private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();synchronized FileSystem get(URI uri, Configuration conf) throws IOException{Key key = new Key(uri, conf);FileSystem fs = map.get(key);if (fs == null) {fs = createFileSystem(uri, conf);if (map.isEmpty() && !clientFinalizer.isAlive()) {Runtime.getRuntime().addShutdownHook(clientFinalizer);}fs.key = key;map.put(key, fs);}return fs;}synchronized void remove(Key key, FileSystem fs) {if (map.containsKey(key) && fs == map.get(key)) {map.remove(key);if (map.isEmpty() && !clientFinalizer.isAlive()) {if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {LOG.info("Could not cancel cleanup thread, though no " +"FileSystems are open");}}}}synchronized void closeAll() throws IOException {List<IOException> exceptions = new ArrayList<IOException>();for(; !map.isEmpty(); ) {Map.Entry<Key, FileSystem> e = map.entrySet().iterator().next();final Key key = e.getKey();final FileSystem fs = e.getValue();//remove from cache remove(key, fs);if (fs != null) {try {fs.close();}catch(IOException ioe) {exceptions.add(ioe);}}}if (!exceptions.isEmpty()) {throw MultipleIOException.createIOException(exceptions);}}/** FileSystem.Cache.Key */static class Key {final String scheme;final String authority;final String username;Key(URI uri, Configuration conf) throws IOException {scheme = uri.getScheme()==null?"":uri.getScheme().toLowerCase();authority = uri.getAuthority()==null?"":uri.getAuthority().toLowerCase();UserGroupInformation ugi = UserGroupInformation.readFrom(conf);if (ugi == null) {try {ugi = UserGroupInformation.login(conf);} catch(LoginException e) {LOG.warn("uri=" + uri, e);}}username = ugi == null? null: ugi.getUserName();}/** {@inheritDoc} */public int hashCode() {return (scheme + authority + username).hashCode();}static boolean isEqual(Object a, Object b) {return a == b || (a != null && a.equals(b)); }/** {@inheritDoc} */public boolean equals(Object obj) {if (obj == this) {return true;}if (obj != null && obj instanceof Key) {Key that = (Key)obj;return isEqual(this.scheme, that.scheme)&& isEqual(this.authority, that.authority)&& isEqual(this.username, that.username);}return false; }/** {@inheritDoc} */public String toString() {return username + "@" + scheme + "://" + authority; }}}public static final class Statistics {private final String scheme;private AtomicLong bytesRead = new AtomicLong();private AtomicLong bytesWritten = new AtomicLong();public Statistics(String scheme) {this.scheme = scheme;}/*** Increment the bytes read in the statistics* @param newBytes the additional bytes read*/public void incrementBytesRead(long newBytes) {bytesRead.getAndAdd(newBytes);}/*** Increment the bytes written in the statistics* @param newBytes the additional bytes written*/public void incrementBytesWritten(long newBytes) {bytesWritten.getAndAdd(newBytes);}/*** Get the total number of bytes read* @return the number of bytes*/public long getBytesRead() {return bytesRead.get();}/*** Get the total number of bytes written* @return the number of bytes*/public long getBytesWritten() {return bytesWritten.get();}public String toString() {return bytesRead + " bytes read and " + bytesWritten + " bytes written";}/*** Reset the counts of bytes to 0.*/public void reset() {bytesWritten.set(0);bytesRead.set(0);}/*** Get the uri scheme associated with this statistics object.* @return the schema associated with this set of statistics*/public String getScheme() {return scheme;}}/*** Get the Map of Statistics object indexed by URI Scheme.* @return a Map having a key as URI scheme and value as Statistics object* @deprecated use {@link #getAllStatistics} instead*/public static synchronized Map<String, Statistics> getStatistics() {Map<String, Statistics> result = new HashMap<String, Statistics>();for(Statistics stat: statisticsTable.values()) {result.put(stat.getScheme(), stat);}return result;}/*** Return the FileSystem classes that have Statistics*/public static synchronized List<Statistics> getAllStatistics() {return new ArrayList<Statistics>(statisticsTable.values());}/*** Get the statistics for a particular file system* @param cls the class to lookup* @return a statistics object*/public static synchronized Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {Statistics result = statisticsTable.get(cls);if (result == null) {result = new Statistics(scheme);statisticsTable.put(cls, result);}return result;}public static synchronized void clearStatistics() {for(Statistics stat: statisticsTable.values()) {stat.reset();}}public static synchronizedvoid printStatistics() throws IOException {for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: statisticsTable.entrySet()) {System.out.println(" FileSystem " + pair.getKey().getName() + ": " + pair.getValue());}} } FileSystem

Hadoop 輸入/輸出流

Hadoop抽象文件系統和java類似,也是使用流機制進行文件的讀寫,用于讀文件數據流和寫文件的抽象類分別是:FSDataInputStream和FSDataOutputStream

1、FSDataInputStream

public class FSDataInputStream extends DataInputStreamimplements Seekable, PositionedReadable { …… }

可以看到,FSDataInputStream繼承自DataInputStream類,實現了Seekable和PositionedReadable接口

Seekable接口提供在(文件)流中進行隨機存取的方法,其功能類似于RandomAccessFile中的getFilePointer()和seek()方法,它提供了某種隨機定位文件讀取位置的能力

Seekable接口代碼以及相關注釋如下:

/** 接口,用于支持在流中定位. */ public interface Seekable {/*** 將當前偏移量設置到參數位置,下次讀取數據將從該位置開始*/void seek(long pos) throws IOException;/**得到當前偏移量 */long getPos() throws IOException;/**重新選擇一個副本 */boolean seekToNewSource(long targetPos) throws IOException; }

完整的FSDataInputStream類源代碼如下:

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.fs;import java.io.*;/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}* and buffers input through a {@link BufferedInputStream}. */ public class FSDataInputStream extends DataInputStreamimplements Seekable, PositionedReadable {public FSDataInputStream(InputStream in)throws IOException {super(in);if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");}}public synchronized void seek(long desired) throws IOException {((Seekable)in).seek(desired);}public long getPos() throws IOException {return ((Seekable)in).getPos();}public int read(long position, byte[] buffer, int offset, int length)throws IOException {return ((PositionedReadable)in).read(position, buffer, offset, length);}public void readFully(long position, byte[] buffer, int offset, int length)throws IOException {((PositionedReadable)in).readFully(position, buffer, offset, length);}public void readFully(long position, byte[] buffer)throws IOException {((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);}public boolean seekToNewSource(long targetPos) throws IOException {return ((Seekable)in).seekToNewSource(targetPos); } } FSDataInputStream

FSDataInputStream實現的另一個接口是PositionedReadable,它提供了從流中某一個位置開始讀數據的一系列方法:

//接口,用于在流中進行定位讀 public interface PositionedReadable {//從指定位置開始,讀最多指定長度的數據到buffer中offset開始的緩沖區中//注意,該函數不改變讀流的當前位置,同時,它是線程安全的public int read(long position, byte[] buffer, int offset, int length)throws IOException;//從指定位置開始,讀指定長度的數據到buffer中offset開始的緩沖區中public void readFully(long position, byte[] buffer, int offset, int length)throws IOException;public void readFully(long position, byte[] buffer) throws IOException; }

PositionedReadable中的3個讀方法,都不會改變流的當前位置,而且還是線程安全的

2、FSInputStream

org.apache.hadoop.fs包中還包含抽象類FSInputStream。Seekable接口和PositionedReadable中的方法都成為這個類的抽象方法

在FSInputStream類中,通過Seekable接口的seek()方法實現了PositionedReadable接口中的read()方法

//實現PositionedReadable.read()方法 public int read(long position, byte[] buffer, int offset, int length) throws IOException {/*** 由于PositionedReadable.read()是線程安全的,所以此處要借助synchronized (this) * 來保證方法被調用的時候其他方法不會被調用,也保證不會有其他線程改變Seekable.getPos()保存的* 當前讀位置*/synchronized (this) {long oldPos = getPos(); //保存當前讀的位置,調用 Seekable.getPos()int nread = -1;try {seek(position); //移動讀數據的位置,調用Seekable.seek()nread = read(buffer, offset, length); //調用InputStream.read()讀取數據} finally {seek(oldPos); //調用Seekable.seek()恢復InputStream.read()前的位置 }return nread;} }

完整的FSInputStream源代碼如下:

package org.apache.hadoop.fs;import java.io.*;/***************************************************************** FSInputStream is a generic old InputStream with a little bit* of RAF-style seek ability.******************************************************************/ public abstract class FSInputStream extends InputStreamimplements Seekable, PositionedReadable {/*** Seek to the given offset from the start of the file.* The next read() will be from that location. Can't* seek past the end of the file.*/public abstract void seek(long pos) throws IOException;/*** Return the current offset from the start of the file*/public abstract long getPos() throws IOException;/*** Seeks a different copy of the data. Returns true if * found a new source, false otherwise.*/public abstract boolean seekToNewSource(long targetPos) throws IOException;public int read(long position, byte[] buffer, int offset, int length)throws IOException {synchronized (this) {long oldPos = getPos();int nread = -1;try {seek(position);nread = read(buffer, offset, length);} finally {seek(oldPos);}return nread;}}public void readFully(long position, byte[] buffer, int offset, int length)throws IOException {int nread = 0;while (nread < length) {int nbytes = read(position+nread, buffer, offset+nread, length-nread);if (nbytes < 0) {throw new EOFException("End of file reached before reading fully.");}nread += nbytes;}}public void readFully(long position, byte[] buffer)throws IOException {readFully(position, buffer, 0, buffer.length);} } FSInputStream

注意:hadoop中沒有相對應的FSOutputStream類

3、FSDataOutputStream

FSDataOutputStream用于寫數據,和FSDataInputStream類似,繼承自DataOutputStream,提供writeInt()和writeChar()等方法,但是FSDataOutputStream更加的簡單,沒有實現Seekable接口,也就是說,Hadoop文件系統不支持隨機寫,用戶不能在文件中重新定位寫位置,并通過寫數據來覆蓋文件原有的內容。單用戶可以通過getPos()方法獲得當前流的寫位置,為了實現getPos()方法,FSDataOutputStream定義了內部類PositionCache,該類繼承自FilterOutputStream,并通過重載write()方法跟蹤目前流的寫位置.

PositionCache是一個典型的過濾流,在基礎的流功能上添加了getPos()方法,同時利用FileSystem.Statistics實現了文件系統讀寫的一些統計。

public class FSDataOutputStream extends DataOutputStream implements Syncable {private OutputStream wrappedStream;private static class PositionCache extends FilterOutputStream {private FileSystem.Statistics statistics;long position; //當前流的寫位置public PositionCache(OutputStream out, FileSystem.Statistics stats,long pos) throws IOException {super(out);statistics = stats;position = pos;}public void write(int b) throws IOException {out.write(b);position++; //跟新當前位置if (statistics != null) {statistics.incrementBytesWritten(1); //跟新文件統計值 }}public void write(byte b[], int off, int len) throws IOException {out.write(b, off, len);position += len; // update positionif (statistics != null) {statistics.incrementBytesWritten(len);}}public long getPos() throws IOException {return position; //返回當前流的寫位置 }public void close() throws IOException {out.close();}}@Deprecatedpublic FSDataOutputStream(OutputStream out) throws IOException {this(out, null);}public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)throws IOException {this(out, stats, 0);}public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,long startPosition) throws IOException {super(new PositionCache(out, stats, startPosition)); //直接生成PositionCache對象并調用父類構造方法wrappedStream = out;}public long getPos() throws IOException {return ((PositionCache)out).getPos();}public void close() throws IOException {out.close(); // This invokes PositionCache.close() }// Returns the underlying output stream. This is used by unit tests.public OutputStream getWrappedStream() {return wrappedStream;}/** {@inheritDoc} */public void sync() throws IOException {if (wrappedStream instanceof Syncable) {((Syncable)wrappedStream).sync();}} }

FSDataOutputStream實現了Syncable接口,該接口只有一個函數sync(),其目的和Linux中系統調用sync()類似,用于將流中保存的數據同步到設備中

/** This interface declare the sync() operation. */ public interface Syncable {/*** Synchronize all buffer with the underlying devices.* @throws IOException*/public void sync() throws IOException; }

?

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的hadoop文件系统与I/O流的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。