日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java实现对HDFS增删改查(CRUD)等操作

發(fā)布時間:2025/6/15 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java实现对HDFS增删改查(CRUD)等操作 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

實現(xiàn)對HDFS增刪改查CRUD等操作

1 查找

列出某個目錄下的文件名稱,hdfs命令如下所示:

hdfs dfs –ls/usr/app

java代碼片段:

[plain] view plain copy print?
  • public?void?list(String?srcPath)?{??
  • ???????????Configuration?conf?=?new?Configuration();??
  • ???????????LOG.info("[Defaultfs]?:"?+conf.get("fs.default.name"));??
  • ?????????conf.set("hadoop.job.ugi","app,app");???//It?is?not?necessary?for?the?default?user.??
  • ???????????FileSystem?fs;??
  • ??????????try?{??
  • ???????????????????fs=?FileSystem.get(conf);??
  • ???????????????????RemoteIterator<LocatedFileStatus>rmIterator?=?fs.listLocatedStatus(new?Path(srcPath));??
  • ???????????????????while?(rmIterator.hasNext())?{??
  • ?????????????????????????????Path?path?=?rmIterator.next().getPath();??
  • ?????????????????????????????if(fs.isDirectory(path)){??
  • ???????????????????????????????????????LOG.info("-----------DirectoryName:?"+path.getName());??
  • ?????????????????????????????}??
  • ?????????????????????????????else?if(fs.isFile(path)){??
  • ???????????????????????????????????????LOG.info("-----------FileName:?"+path.getName());??
  • ?????????????????????????????}??
  • ???????????????????}??
  • ??????????}catch?(IOException?e)?{??
  • ???????????????????LOG.error("list?fileSysetm?object?stream.:"?,?e);??
  • ???????????????????new?RuntimeException(e);??
  • ??????????}??
  • }??


  • 輸出結(jié)果:

    2014-03-11 22:38:15,329 INFO? (com.hdfs.client.SyncDFS:48) ------------File Name: README.txt

    2014-03-11 22:38:15,331 INFO? (com.hdfs.client.SyncDFS:45) ------------Directory Name: blog_blogpost

    2014-03-11 22:38:15,333 INFO? (com.hdfs.client.SyncDFS:45) ------------Directory Name: test

    讀取文件中的內(nèi)容,hdfs命令如下:

    hdfs dfs –cat /input

    java 代碼:

    [plain] view plain copy print?
  • public?void?readFile(String?file){??
  • ???????????Configurationconf?=?new?Configuration();??
  • ???????????FileSystemfs;??
  • ???????????try?{??
  • ????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????Pathpath?=?new?Path(file);??
  • ????????????????????if(!fs.exists(path)){??
  • ?????????????????????????????LOG.warn("file'"+?file+"'?doesn't?exist!");??
  • ?????????????????????????????return?;??
  • ????????????????????}??
  • ????????????????????FSDataInputStreamin?=?fs.open(path);??
  • ????????????????????Stringfilename?=?file.substring(file.lastIndexOf('/')?+?1,?file.length());??
  • ????????????????????OutputStreamout?=?new?BufferedOutputStream(new?FileOutputStream(??
  • ???????????????????????????????????????????????????????????????????new?File(filename)));??
  • ??
  • ????????????????????byte[]?b?=?new?byte[1024];??
  • ????????????????????int?numBytes?=?0;??
  • ????????????????????while?((numBytes?=?in.read(b))?>?0)?{??
  • ?????????????????????????????out.write(b,0,?numBytes);??
  • ????????????????????}??
  • ????????????????????in.close();??
  • ????????????????????out.close();??
  • ????????????????????fs.close();??
  • ???????????}catch?(IOException?e)?{??
  • ????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??
  • ????????????????????new?RuntimeException(e);??
  • ???????????}??
  • ?}??


  • 獲取文件的修改時間,java代碼:

    [plain] view plain copy print?
  • /**??
  • ????????*?Gets?the?information?about?the?file?modifiedtime.??
  • ????????*?@param?source??
  • ????????*?@throws?IOException??
  • ????????*/??
  • ???????public?void?getModificationTime(String?source)?throws?IOException{??
  • ????????????????????
  • ?????????????????Configurationconf?=?new?Configuration();??
  • ????????????????????
  • ?????????????????FileSystemfs?=?FileSystem.get(conf);??
  • ?????????????????PathsrcPath?=?new?Path(source);??
  • ????????????????????
  • ?????????????????//?Check?if?the?file?alreadyexists??
  • ?????????????????if?(!(fs.exists(srcPath)))?{??
  • ?????????????????System.out.println("No?such?destination?"?+?srcPath);??
  • ?????????????????return;??
  • ?????????????????}??
  • ?????????????????//?Get?the?filename?out?of?thefile?path??
  • ?????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ????????????????????
  • ?????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);??
  • ?????????????????long?modificationTime?=fileStatus.getModificationTime();??
  • ????????????????????
  • ?????????????????LOG.info("modified?datetime:?"?+?System.out.format("File?%s;?Modification?time?:?%0.2f%n",filename,modificationTime));??
  • ????????????????????
  • ???????}??


  • 獲取文件塊定位信息,java代碼:

    [plain] view plain copy print?
  • /**??
  • ??????????*?Gets?the?file?block?location?info??
  • ??????????*?@param?source??
  • ??????????*?@throws?IOException??
  • ??????????*/??
  • ?????????public?void?getBlockLocations(String?source)?throws?IOException{??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs?=?FileSystem.get(conf);??
  • ???????????????????PathsrcPath?=?new?Path(source);??
  • ??????????????????????
  • ???????????????????//?Check?if?the?file?alreadyexists??
  • ???????????????????if?(!(ifExists(source)))?{??
  • ????????????????????????????System.out.println("No?such?destination?"?+?srcPath);??
  • ????????????????????????????return;??
  • ???????????????????}??
  • ???????????????????//?Get?the?filename?out?of?thefile?path??
  • ???????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ??????????????????????
  • ???????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);??
  • ??????????????????????
  • ???????????????????BlockLocation[]blkLocations?=?fs.getFileBlockLocations(fileStatus,?0,?fileStatus.getLen());??
  • ???????????????????int?blkCount?=?blkLocations.length;??
  • ??????????????????????
  • ???????????????????System.out.println("File?:"?+?filename?+?"stored?at:");??
  • ???????????????????for?(int?i=0;?i?<?blkCount;?i++)?{??
  • ????????????????????????????String[]hosts?=?blkLocations[i].getHosts();??
  • ????????????????????????????LOG.info("host?ip:"?+System.out.format("Host?%d:?%s?%n",?i,?hosts));??
  • ???????????????????}??
  • ?????????}??


  • 獲取Hadoop集群中data node的DNS主機名,java代碼:

    [plain] view plain copy print?
  • public?void?getHostnames?()?throwsIOException{??
  • ???????????????????Configurationconfig?=?new?Configuration();??
  • ????????????????????
  • ???????????????????FileSystemfs?=?FileSystem.get(config);??
  • ???????????????????DistributedFileSystemhdfs?=?(DistributedFileSystem)?fs;??
  • ???????????????????DatanodeInfo[]dataNodeStats?=?hdfs.getDataNodeStats();??
  • ??????????????????????
  • ???????????????????String[]names?=?new?String[dataNodeStats.length];??
  • ???????????????????for?(int?i?=?0;?i?<?dataNodeStats.length;?i++)?{??
  • ????????????????????????????names[i]=?dataNodeStats[i].getHostName();??
  • ????????????????????????????LOG.info("datenode?hostname:"+(dataNodeStats[i].getHostName()));??
  • ???????????????????}??
  • ?????????}??


  • ?

    2 創(chuàng)建

    創(chuàng)建一個目錄,指定具體的文件路徑。hdfs命令如下:

    [plain] view plain copy print?
  • hdfs?dfs?–mkdir/usr/app/tmp??


  • java代碼:

    ? ? ? ?

    [java] view plain copy print?
  • public?void?mkdir(String?dir){??
  • ?????????????????Configurationconf?=?new?Configuration();??
  • ?????????????????FileSystemfs?=?null;??
  • ?????????????????try?{??
  • ??????????????????????????fs=?FileSystem.get(conf);??
  • ??????????????????????????Pathpath?=?new?Path(dir);??
  • ??????????????????????????if(!fs.exists(path)){??
  • ???????????????????????????????????fs.mkdirs(path);??
  • ???????????????????????????????????LOG.debug("create?directory?'"+dir+"'?successfully!");??
  • ??????????????????????????}else{??
  • ???????????????????????????????????LOG.debug("directory?'"+dir+"'?exits!");??
  • ??????????????????????????}??
  • ?????????????????}catch?(IOException?e)?{??
  • ??????????????????????????LOG.error("FileSystem?get?configuration?with?anerror");??
  • ??????????????????????????e.printStackTrace();??
  • ?????????????????}finally{??
  • ??????????????????????????if(fs!=?null){??
  • ???????????????????????????????????try?{??
  • ?????????????????????????????????????????????fs.close();??
  • ???????????????????????????????????}catch?(IOException?e)?{??
  • ?????????????????????????????????????????????LOG.error("close?fs?object?stream.?:"?,?e);??
  • ?????????????????????????????????????????????new?RuntimeException(e);??
  • ???????????????????????????????????}??
  • ??????????????????????????}??
  • ?????????????????}??
  • ???????}??

  • 將本地文件上傳到hdfs上去,java代碼如下:

    [plain] view plain copy print?
  • public?void?copyFromLocal?(String?source,?String?dest)?{??
  • ??????????????????????
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????PathsrcPath?=?new?Path(source);??
  • ???????????????????????????????
  • ????????????????????????????PathdstPath?=?new?Path(dest);??
  • ????????????????????????????//?Check?if?the?file?alreadyexists??
  • ????????????????????????????if?(!(fs.exists(dstPath)))?{??
  • ?????????????????????????????????????LOG.warn("dstPathpath?doesn't?exist"?);??
  • ?????????????????????????????????????LOG.error("No?such?destination?"?+?dstPath);??
  • ?????????????????????????????????????return;??
  • ????????????????????????????}??
  • ???????????????????????????????
  • ????????????????????????????//?Get?the?filename?out?of?thefile?path??
  • ????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ???????????????????????????????
  • ????????????????????????????try{??
  • ?????????????????????????????????????//if?the?file?exists?in?thedestination?path,?it?will?throw?exception.??
  • //???????????????????????????????????fs.copyFromLocalFile(srcPath,dstPath);??
  • ?????????????????????????????????????//remove?and?overwrite?files?withthe?method??
  • ?????????????????????????????????????//copyFromLocalFile(booleandelSrc,?boolean?overwrite,?Path?src,?Path?dst)??
  • ?????????????????????????????????????fs.copyFromLocalFile(false,?true,?srcPath,?dstPath);??
  • ?????????????????????????????????????LOG.info("File?"?+?filename?+?"copied?to?"?+?dest);??
  • ????????????????????????????}catch(Exception?e){??
  • ?????????????????????????????????????LOG.error("copyFromLocalFile?exception?caught!:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}finally{??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}??
  • ???????????????????}catch?(IOException?e1)?{??
  • ????????????????????????????LOG.error("copyFromLocal?IOException?objectstream.?:"?,e1);??
  • ????????????????????????????new?RuntimeException(e1);??
  • ???????????????????}??
  • ?????????}??


  • ????????

    添加一個文件到指定的目錄下,java代碼如下:

    ? ? ??

    [java] view plain copy print?
  • public?void?addFile(String?source,?String?dest)??{??
  • ?????????????????????????//?Conf?object?will?readthe?HDFS?configuration?parameters??
  • ?????????????????????????Configurationconf?=?new?Configuration();??
  • ?????????????????????????FileSystemfs;??
  • ?????????????????????????try?{??
  • ??????????????????????????????????fs=?FileSystem.get(conf);??
  • ??????????????????????????????????//?Get?the?filename?out?of?thefile?path??
  • ??????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ???????????????????????????????????
  • ??????????????????????????????????//?Create?the?destination?pathincluding?the?filename.??
  • ??????????????????????????????????if?(dest.charAt(dest.length()?-?1)?!=?'/')?{??
  • ????????????????????????????????????????????dest=?dest?+?"/"?+?filename;??
  • ??????????????????????????????????}else?{??
  • ????????????????????????????????????????????dest=?dest?+?filename;??
  • ??????????????????????????????????}??
  • ???????????????????????????????????
  • ??????????????????????????????????//?Check?if?the?file?alreadyexists??
  • ??????????????????????????????????Pathpath?=?new?Path(dest);??
  • ??????????????????????????????????if?(fs.exists(path))?{??
  • ????????????????????????????????????????????LOG.error("File?"?+?dest?+?"?already?exists");??
  • ????????????????????????????????????????????return;??
  • ??????????????????????????????????}??
  • ???????????????????????????????????
  • ??????????????????????????????????//?Create?a?new?file?and?writedata?to?it.??
  • ??????????????????????????????????FSDataOutputStreamout?=?fs.create(path);??
  • ??????????????????????????????????InputStreamin?=?new?BufferedInputStream(new?FileInputStream(??
  • ??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????new?File(source)));??
  • ???????????????????????????????????
  • ??????????????????????????????????byte[]?b?=?new?byte[1024];??
  • ??????????????????????????????????int?numBytes?=?0;??
  • ??????????????????????????????????//In?this?way?read?and?write?datato?destination?file.??
  • ??????????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{??
  • ????????????????????????????????????????????out.write(b,0,?numBytes);??
  • ??????????????????????????????????}??
  • ??????????????????????????????????in.close();??
  • ??????????????????????????????????out.close();??
  • ??????????????????????????????????fs.close();??
  • ?????????????????????????}catch?(IOException?e)?{??
  • ??????????????????????????????????LOG.error("addFile?Exception?caught!?:"?,?e);??
  • ??????????????????????????????????new?RuntimeException(e);??
  • ?????????????????????????}??
  • ??????}??

  • 3 修改

    重新命名hdfs中的文件名稱,java代碼如下:

    ? ??

    [java] view plain copy print?
  • public?void?renameFile?(String?fromthis,?String?tothis){??
  • ??????????????Configurationconf?=?new?Configuration();??
  • ?????????????????
  • ??????????????FileSystemfs;??
  • ??????????????try?{??
  • ???????????????????????fs=?FileSystem.get(conf);??
  • ???????????????????????PathfromPath?=?new?Path(fromthis);??
  • ???????????????????????PathtoPath?=?new?Path(tothis);??
  • ??????????????????????????
  • ???????????????????????if?(!(fs.exists(fromPath)))?{??
  • ????????????????????????????????LOG.info("No?such?destination?"?+?fromPath);??
  • ???????????????????????????return;??
  • ???????????????????????}??
  • ??????????????????????????
  • ???????????????????????if?(fs.exists(toPath))?{??
  • ????????????????????????????????LOG.info("Already?exists!?"?+?toPath);??
  • ???????????????????????????return;??
  • ???????????????????????}??
  • ??????????????????????????
  • ???????????????????????try{??
  • ????????????????????????????????boolean?isRenamed?=?fs.rename(fromPath,toPath);?????//renames?file?name?indeed.??
  • ????????????????????????????????if(isRenamed){??
  • ??????????????????????????????????????????LOG.info("Renamed?from?"?+?fromthis?+?"?to?"?+?tothis);??
  • ????????????????????????????????}??
  • ???????????????????????}catch(Exception?e){??
  • ????????????????????????????????LOG.error("renameFile?Exception?caught!?:"?,?e);??
  • ????????????????????????????????new?RuntimeException(e);??
  • ???????????????????????}finally{??
  • ????????????????????????????????fs.close();??
  • ???????????????????????}??
  • ??????????????}catch?(IOException?e1)?{??
  • ???????????????????????LOG.error("fs?Exception?caught!?:"?,?e1);??
  • ???????????????????????new?RuntimeException(e1);??
  • ??????????????}??
  • ????}??

  • ?

    4 刪除

    在hdfs上,刪除指定的一個文件。Java代碼:

    [java] view plain copy print?
  • public?void?deleteFile(String?file)??{??
  • ????????????????????????????Configurationconf?=?new?Configuration();??
  • ????????????????????????????FileSystemfs;??
  • ????????????????????????????try?{??
  • ?????????????????????????????????????fs=?FileSystem.get(conf);??
  • ??????????
  • ?????????????????????????????????????Pathpath?=?new?Path(file);??
  • ?????????????????????????????????????if?(!fs.exists(path))?{??
  • ???????????????????????????????????????????????LOG.info("File?"?+?file?+?"?does?not?exists");??
  • ???????????????????????????????????????????????return;??
  • ?????????????????????????????????????}??
  • ?????????????????????????????????????/*?
  • ??????????????????????????????????????*?recursively?delete?the?file(s)?if?it?is?adirectory.?
  • ??????????????????????????????????????*?If?you?want?to?mark?the?path?that?will?bedeleted?as?
  • ??????????????????????????????????????*?a?result?of?closing?the?FileSystem.?
  • ??????????????????????????????????????*??deleteOnExit(Path?f)?
  • ??????????????????????????????????????*/??
  • ?????????????????????????????????????fs.delete(new?Path(file),?true);??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}catch?(IOException?e)?{??
  • ?????????????????????????????????????LOG.error("deleteFile?Exception?caught!?:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}??
  • ???
  • ???????????????????}??


  • ?

    Appendix 完整代碼

    ?

    [java] view plain copy print?
  • import?java.io.BufferedInputStream;??
  • import?java.io.BufferedOutputStream;??
  • import?java.io.File;??
  • import?java.io.FileInputStream;??
  • import?java.io.FileOutputStream;??
  • import?java.io.IOException;??
  • import?java.io.InputStream;??
  • import?java.io.OutputStream;??
  • ???
  • import?org.apache.commons.logging.Log;??
  • import?org.apache.commons.logging.LogFactory;??
  • import?org.apache.hadoop.conf.Configuration;??
  • import?org.apache.hadoop.fs.BlockLocation;??
  • import?org.apache.hadoop.fs.FSDataInputStream;??
  • import?org.apache.hadoop.fs.FSDataOutputStream;??
  • import?org.apache.hadoop.fs.FileStatus;??
  • import?org.apache.hadoop.fs.FileSystem;??
  • import?org.apache.hadoop.fs.LocatedFileStatus;??
  • import?org.apache.hadoop.fs.Path;??
  • import?org.apache.hadoop.fs.RemoteIterator;??
  • importorg.apache.hadoop.hdfs.DistributedFileSystem;??
  • import?org.apache.hadoop.hdfs.protocol.DatanodeInfo;??
  • ???
  • public?class?SyncDFS?{??
  • ??????????
  • ?????????private?static?final?Log?LOG?=?LogFactory.getLog(SyncDFS.class);??
  • ??????????
  • ?????????/**?
  • ??????????*?Reads?the?directory?name(s)?and?file?name(s)from?the?specified?parameter?"srcPath"?
  • ??????????*?@param?srcPath?
  • ??????????*/??
  • ?????????public?void?list(String?srcPath)?{??
  • ????????????????????Configuration?conf?=?new?Configuration();??
  • ????????????????????LOG.info("[Defaultfs]?:"?+conf.get("fs.default.name"));??
  • //????????????????conf.set("hadoop.job.ugi","app,app");???//It?is?not?necessary?for?the?default?user.??
  • ????????????????????FileSystem?fs;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????RemoteIterator<LocatedFileStatus>rmIterator?=?fs.listLocatedStatus(new?Path(srcPath));??
  • ????????????????????????????while?(rmIterator.hasNext())?{??
  • ??????????????????????????????????????Path?path?=?rmIterator.next().getPath();??
  • ??????????????????????????????????????if(fs.isDirectory(path)){??
  • ????????????????????????????????????????????????LOG.info("-----------DirectoryName:?"+path.getName());??
  • ??????????????????????????????????????}??
  • ??????????????????????????????????????else?if(fs.isFile(path)){??
  • ????????????????????????????????????????????????LOG.info("-----------FileName:?"+path.getName());??
  • ??????????????????????????????????????}??
  • ????????????????????????????}??
  • ???????????????????}catch?(IOException?e)?{??
  • ????????????????????????????LOG.error("list?fileSysetm?object?stream.:"?,?e);??
  • ????????????????????????????new?RuntimeException(e);??
  • ???????????????????}??
  • ?????????}??
  • ???
  • ?????????/**?
  • ??????????*?Makes?the?specified?directory?if?it?doesn'texist.?
  • ??????????*?@param?dir?
  • ??????????*/??
  • ?????????public?void?mkdir(String?dir){??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs?=?null;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????Pathpath?=?new?Path(dir);??
  • ????????????????????????????if(!fs.exists(path)){??
  • ?????????????????????????????????????fs.mkdirs(path);??
  • ?????????????????????????????????????LOG.debug("create?directory?'"+dir+"'?successfully!");??
  • ????????????????????????????}else{??
  • ?????????????????????????????????????LOG.debug("directory?'"+dir+"'?exits!");??
  • ????????????????????????????}??
  • ???????????????????}catch?(IOException?e)?{??
  • ????????????????????????????LOG.error("FileSystem?get?configuration?with?anerror");??
  • ????????????????????????????e.printStackTrace();??
  • ???????????????????}finally{??
  • ????????????????????????????if(fs!=?null){??
  • ?????????????????????????????????????try?{??
  • ???????????????????????????????????????????????fs.close();??
  • ?????????????????????????????????????}catch?(IOException?e)?{??
  • ???????????????????????????????????????????????LOG.error("close?fs?object?stream.?:"?,?e);??
  • ???????????????????????????????????????????????new?RuntimeException(e);??
  • ?????????????????????????????????????}??
  • ????????????????????????????}??
  • ???????????????????}??
  • ?????????}??
  • ??????????
  • ?????????/**?
  • ??????????*?Reads?the?file?content?in?console.?
  • ??????????*?@param?file?
  • ??????????*/??
  • ?????????public?void?readFile(String?file){??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????Pathpath?=?new?Path(file);??
  • ????????????????????????????if(!fs.exists(path)){??
  • ?????????????????????????????????????LOG.warn("file'"+?file+"'?doesn't?exist!");??
  • ?????????????????????????????????????return?;??
  • ????????????????????????????}??
  • ????????????????????????????FSDataInputStreamin?=?fs.open(path);??
  • ????????????????????????????Stringfilename?=?file.substring(file.lastIndexOf('/')?+?1,?file.length());??
  • ????????????????????????????OutputStreamout?=?new?BufferedOutputStream(new?FileOutputStream(??
  • ????????????????????????????????????????????????????????????????????????????new?File(filename)));??
  • ???
  • ????????????????????????????byte[]?b?=?new?byte[1024];??
  • ????????????????????????????int?numBytes?=?0;??
  • ????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{??
  • ?????????????????????????????????????out.write(b,0,?numBytes);??
  • ????????????????????????????}??
  • ????????????????????????????in.close();??
  • ????????????????????????????out.close();??
  • ????????????????????????????fs.close();??
  • ???????????????????}catch?(IOException?e)?{??
  • ????????????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??
  • ????????????????????????????new?RuntimeException(e);??
  • ???????????????????}??
  • ?????????}??
  • ??????????
  • ?????????public?boolean?ifExists(String?source){??
  • ???????????????????if(source?==?null?||?source.length()?==0){??
  • ????????????????????????????return?false;??
  • ???????????????????}??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs?=?null;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????LOG.debug("judge?file?'"+source?+??"'");??
  • ????????????????????????????return?fs.exists(new?Path(source));??
  • ???????????????????}catch?(IOException?e)?{??
  • ????????????????????????????LOG.error("ifExists?fs?Exception?caught!?:"?,?e);??
  • ????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????return?false;??
  • ???????????????????}finally{??
  • ????????????????????????????if(fs?!=?null){??
  • ?????????????????????????????????????try?{??
  • ???????????????????????????????????????????????fs.close();??
  • ?????????????????????????????????????}catch?(IOException?e)?{??
  • ???????????????????????????????????????????????LOG.error("fs.close?Exception?caught!?:"?,?e);??
  • ???????????????????????????????????????????????new?RuntimeException(e);??
  • ?????????????????????????????????????}??
  • ????????????????????????????}??
  • ?????????????????????????????
  • ???????????????????}??
  • ????????????????????
  • ?????????}??
  • ??????????
  • ?????????/**?
  • ??????????*?Recursively?copies?the?source?pathdirectories?or?files?to?the?destination?path?of?DFS.?
  • ??????????*?It?is?the?same?functionality?as?thefollowing?comand:?
  • ??????????*??????hadoopfs?-copyFromLocal?<local?fs><hadoop?fs>?
  • ??????????*?@param?source?
  • ??????????*?@param?dest?
  • ??????????*/??
  • ?????????public?void?copyFromLocal?(String?source,?String?dest)?{??
  • ??????????????????????
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????PathsrcPath?=?new?Path(source);??
  • ???????????????????????????????
  • ????????????????????????????PathdstPath?=?new?Path(dest);??
  • ????????????????????????????//?Check?if?the?file?alreadyexists??
  • ????????????????????????????if?(!(fs.exists(dstPath)))?{??
  • ?????????????????????????????????????LOG.warn("dstPathpath?doesn't?exist"?);??
  • ?????????????????????????????????????LOG.error("No?such?destination?"?+?dstPath);??
  • ?????????????????????????????????????return;??
  • ????????????????????????????}??
  • ???????????????????????????????
  • ????????????????????????????//?Get?the?filename?out?of?thefile?path??
  • ????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ???????????????????????????????
  • ????????????????????????????try{??
  • ?????????????????????????????????????//if?the?file?exists?in?thedestination?path,?it?will?throw?exception.??
  • //???????????????????????????????????fs.copyFromLocalFile(srcPath,dstPath);??
  • ?????????????????????????????????????//remove?and?overwrite?files?withthe?method??
  • ?????????????????????????????????????//copyFromLocalFile(booleandelSrc,?boolean?overwrite,?Path?src,?Path?dst)??
  • ?????????????????????????????????????fs.copyFromLocalFile(false,?true,?srcPath,?dstPath);??
  • ?????????????????????????????????????LOG.info("File?"?+?filename?+?"copied?to?"?+?dest);??
  • ????????????????????????????}catch(Exception?e){??
  • ?????????????????????????????????????LOG.error("copyFromLocalFile?exception?caught!:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}finally{??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}??
  • ???????????????????}catch?(IOException?e1)?{??
  • ????????????????????????????LOG.error("copyFromLocal?IOException?objectstream.?:"?,e1);??
  • ????????????????????????????new?RuntimeException(e1);??
  • ???????????????????}??
  • ???????????????????}??
  • ??????????
  • ??????????
  • ?????????public?void?renameFile?(String?fromthis,?String?tothis){??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ??????????????????????
  • ???????????????????FileSystemfs;??
  • ???????????????????try?{??
  • ????????????????????????????fs=?FileSystem.get(conf);??
  • ????????????????????????????PathfromPath?=?new?Path(fromthis);??
  • ????????????????????????????PathtoPath?=?new?Path(tothis);??
  • ???????????????????????????????
  • ????????????????????????????if?(!(fs.exists(fromPath)))?{??
  • ?????????????????????????????????????LOG.info("No?such?destination?"?+?fromPath);??
  • ????????????????????????????????return;??
  • ????????????????????????????}??
  • ???????????????????????????????
  • ????????????????????????????if?(fs.exists(toPath))?{??
  • ?????????????????????????????????????LOG.info("Already?exists!?"?+?toPath);??
  • ????????????????????????????????return;??
  • ????????????????????????????}??
  • ???????????????????????????????
  • ????????????????????????????try{??
  • ?????????????????????????????????????boolean?isRenamed?=?fs.rename(fromPath,toPath);?????//renames?file?name?indeed.??
  • ?????????????????????????????????????if(isRenamed){??
  • ???????????????????????????????????????????????LOG.info("Renamed?from?"?+?fromthis?+?"?to?"?+?tothis);??
  • ?????????????????????????????????????}??
  • ????????????????????????????}catch(Exception?e){??
  • ?????????????????????????????????????LOG.error("renameFile?Exception?caught!?:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}finally{??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}??
  • ???????????????????}catch?(IOException?e1)?{??
  • ????????????????????????????LOG.error("fs?Exception?caught!?:"?,?e1);??
  • ????????????????????????????new?RuntimeException(e1);??
  • ???????????????????}??
  • ?????????}??
  • ??????????
  • ?????????/**?
  • ??????????*?Uploads?or?adds?a?file?to?HDFS?
  • ??????????*?@param?source?
  • ??????????*?@param?dest?
  • ??????????*/??
  • ?????????public?void?addFile(String?source,?String?dest)??{??
  • ????????????????????????????//?Conf?object?will?readthe?HDFS?configuration?parameters??
  • ????????????????????????????Configurationconf?=?new?Configuration();??
  • ????????????????????????????FileSystemfs;??
  • ????????????????????????????try?{??
  • ?????????????????????????????????????fs=?FileSystem.get(conf);??
  • ?????????????????????????????????????//?Get?the?filename?out?of?thefile?path??
  • ?????????????????????????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ??????????????????????????????????????
  • ?????????????????????????????????????//?Create?the?destination?pathincluding?the?filename.??
  • ?????????????????????????????????????if?(dest.charAt(dest.length()?-?1)?!=?'/')?{??
  • ???????????????????????????????????????????????dest=?dest?+?"/"?+?filename;??
  • ?????????????????????????????????????}else?{??
  • ???????????????????????????????????????????????dest=?dest?+?filename;??
  • ?????????????????????????????????????}??
  • ??????????????????????????????????????
  • ?????????????????????????????????????//?Check?if?the?file?alreadyexists??
  • ?????????????????????????????????????Pathpath?=?new?Path(dest);??
  • ?????????????????????????????????????if?(fs.exists(path))?{??
  • ???????????????????????????????????????????????LOG.error("File?"?+?dest?+?"?already?exists");??
  • ???????????????????????????????????????????????return;??
  • ?????????????????????????????????????}??
  • ??????????????????????????????????????
  • ?????????????????????????????????????//?Create?a?new?file?and?writedata?to?it.??
  • ?????????????????????????????????????FSDataOutputStreamout?=?fs.create(path);??
  • ?????????????????????????????????????InputStreamin?=?new?BufferedInputStream(new?FileInputStream(??
  • ?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????new?File(source)));??
  • ??????????????????????????????????????
  • ?????????????????????????????????????byte[]?b?=?new?byte[1024];??
  • ?????????????????????????????????????int?numBytes?=?0;??
  • ?????????????????????????????????????//In?this?way?read?and?write?datato?destination?file.??
  • ?????????????????????????????????????while?((numBytes?=?in.read(b))?>?0)?{??
  • ???????????????????????????????????????????????out.write(b,0,?numBytes);??
  • ?????????????????????????????????????}??
  • ?????????????????????????????????????in.close();??
  • ?????????????????????????????????????out.close();??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}catch?(IOException?e)?{??
  • ?????????????????????????????????????LOG.error("addFile?Exception?caught!?:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}??
  • ?????????}??
  • ??????????
  • ?????????/**?
  • ??????????*Deletes?the?files?if?it?is?a?directory.?
  • ??????????*?@param?file?
  • ??????????*/??
  • ?????????public?void?deleteFile(String?file)??{??
  • ????????????????????????????Configurationconf?=?new?Configuration();??
  • ????????????????????????????FileSystemfs;??
  • ????????????????????????????try?{??
  • ?????????????????????????????????????fs=?FileSystem.get(conf);??
  • ??????????
  • ?????????????????????????????????????Pathpath?=?new?Path(file);??
  • ?????????????????????????????????????if?(!fs.exists(path))?{??
  • ???????????????????????????????????????????????LOG.info("File?"?+?file?+?"?does?not?exists");??
  • ???????????????????????????????????????????????return;??
  • ?????????????????????????????????????}??
  • ?????????????????????????????????????/*?
  • ??????????????????????????????????????*?recursively?delete?the?file(s)?if?it?is?adirectory.?
  • ??????????????????????????????????????*?If?you?want?to?mark?the?path?that?will?bedeleted?as?
  • ??????????????????????????????????????*?a?result?of?closing?the?FileSystem.?
  • ??????????????????????????????????????*??deleteOnExit(Path?f)?
  • ??????????????????????????????????????*/??
  • ?????????????????????????????????????fs.delete(new?Path(file),?true);??
  • ?????????????????????????????????????fs.close();??
  • ????????????????????????????}catch?(IOException?e)?{??
  • ?????????????????????????????????????LOG.error("deleteFile?Exception?caught!?:"?,?e);??
  • ?????????????????????????????????????new?RuntimeException(e);??
  • ????????????????????????????}??
  • ???
  • ???????????????????}??
  • ?????????/**?
  • ??????????*?Gets?the?information?about?the?file?modifiedtime.?
  • ??????????*?@param?source?
  • ??????????*?@throws?IOException?
  • ??????????*/??
  • ?????????public?void?getModificationTime(String?source)?throws?IOException{??
  • ??????????????????????
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ??????????????????????
  • ???????????????????FileSystemfs?=?FileSystem.get(conf);??
  • ???????????????????PathsrcPath?=?new?Path(source);??
  • ??????????????????????
  • ???????????????????//?Check?if?the?file?alreadyexists??
  • ???????????????????if?(!(fs.exists(srcPath)))?{??
  • ???????????????????System.out.println("No?such?destination?"?+?srcPath);??
  • ???????????????????return;??
  • ???????????????????}??
  • ???????????????????//?Get?the?filename?out?of?thefile?path??
  • ???????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ??????????????????????
  • ???????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);??
  • ???????????????????long?modificationTime?=fileStatus.getModificationTime();??
  • ??????????????????????
  • ???????????????????LOG.info("modified?datetime:?"?+?System.out.format("File?%s;?Modification?time?:?%0.2f%n",filename,modificationTime));??
  • ??????????????????????
  • ?????????}??
  • ??????????
  • ?????????/**?
  • ??????????*?Gets?the?file?block?location?info?
  • ??????????*?@param?source?
  • ??????????*?@throws?IOException?
  • ??????????*/??
  • ?????????public?void?getBlockLocations(String?source)?throws?IOException{??
  • ???????????????????Configurationconf?=?new?Configuration();??
  • ???????????????????FileSystemfs?=?FileSystem.get(conf);??
  • ???????????????????PathsrcPath?=?new?Path(source);??
  • ??????????????????????
  • ???????????????????//?Check?if?the?file?alreadyexists??
  • ???????????????????if?(!(ifExists(source)))?{??
  • ????????????????????????????System.out.println("No?such?destination?"?+?srcPath);??
  • ????????????????????????????return;??
  • ???????????????????}??
  • ???????????????????//?Get?the?filename?out?of?thefile?path??
  • ???????????????????Stringfilename?=?source.substring(source.lastIndexOf('/')?+?1,?source.length());??
  • ??????????????????????
  • ???????????????????FileStatusfileStatus?=?fs.getFileStatus(srcPath);??
  • ??????????????????????
  • ???????????????????BlockLocation[]blkLocations?=?fs.getFileBlockLocations(fileStatus,?0,?fileStatus.getLen());??
  • ???????????????????int?blkCount?=?blkLocations.length;??
  • ??????????????????????
  • ???????????????????System.out.println("File?:"?+?filename?+?"stored?at:");??
  • ???????????????????for?(int?i=0;?i?<?blkCount;?i++)?{??
  • ????????????????????????????String[]hosts?=?blkLocations[i].getHosts();??
  • ????????????????????????????LOG.info("host?ip:"?+System.out.format("Host?%d:?%s?%n",?i,?hosts));??
  • ???????????????????}??
  • ?????????}??
  • ??????????
  • ?????????public?void?getHostnames?()?throws?IOException{??
  • ???????????????????Configurationconfig?=?new?Configuration();??
  • ????????????????????
  • ???????????????????FileSystemfs?=?FileSystem.get(config);??
  • ???????????????????DistributedFileSystemhdfs?=?(DistributedFileSystem)?fs;??
  • ???????????????????DatanodeInfo[]dataNodeStats?=?hdfs.getDataNodeStats();??
  • ??????????????????????
  • ???????????????????String[]names?=?new?String[dataNodeStats.length];??
  • ???????????????????for?(int?i?=?0;?i?<?dataNodeStats.length;?i++)?{??
  • ????????????????????????????names[i]=?dataNodeStats[i].getHostName();??
  • ????????????????????????????LOG.info("datenode?hostname:"+(dataNodeStats[i].getHostName()));??
  • ???????????????????}??
  • ?????????}??
  • ?????????/**?
  • ??????????*?@param?args?
  • ??????????*/??
  • ?????????public?static?void?main(String[]?args)?{??
  • ???????????????????SyncDFSdfs?=?new?SyncDFS();??
  • ???????????????????dfs.list("/user/app");??
  • ????????????????????
  • ???????????????????dfs.mkdir("/user/app");??
  • ????????????????????
  • //????????????????dfs.readFile("/user/app/README.txt");??
  • ???????????????????LOG.info("--------------"?+??
  • ?????????????????????????????dfs.ifExists("/user/warehouse/hbase.db/u_data/u.data"));?//false??
  • ???????????????????LOG.info("--------------"?+?dfs.ifExists("/user/app/README.txt"));?//true??
  • ????????????????????
  • ???????????????????//copied?the?local?file(s)?to?thedfs.??
  • //????????????????dfs.copyFromLocal("/opt/test","/user/app");??
  • ????????????????????
  • ???????????????????//delete?the?file(s)?from?the?dfs??
  • //????????????????dfs.deleteFile("/user/app/test");??
  • ???????????????????//rename?diretory?in?dfs??
  • //????????????????dfs.renameFile("/user/app/test","/user/app/log");??
  • ???????????????????//rename?file?in?dfs??
  • //????????????????dfs.renameFile("/user/app/log/derby.log","/user/app/log/derby_info.log");??
  • ????????????????????
  • ?????????}??
  • ???
  • ??????????
  • }?
  • 總結(jié)

    以上是生活随笔為你收集整理的java实现对HDFS增删改查(CRUD)等操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。