日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs

發布時間:2025/4/16 编程问答 56 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為利用Hadoop集群平臺的分布存儲和計算能力,基于MapReduce將ftp文件分布式下載并上傳到HDFS中。

1、文件移動流程:ftp服務器->datanode本地目錄->HDFS目錄;

2、實現主要基于兩個設計思想:
? ?1)將FTP服務器文件列表作為MapReduce處理的數據對象,按照文件名分布到不同Reduce節點下載和上傳到HDFS中;
? ?2)在每個datanode節點都建立一個本地文件保存目錄,最好是統一路徑名,這樣每個Reduce節點都把FTP服務器文件下載到該目錄下;

3、代碼主要過程:
? ?1)驅動類中先讀取FTP服務器上要下載的文件列表,并移入到hdfs中,作為Map函數的輸入;
? ?2)Map函數處理文件列表,獲取文件名字,作為Reduce函數輸入;
? ?3)Reduce函數根據輸入的文件名去下載ftp服務器上對應的文件,并下載到datanode節點的統一本地目錄,再將本地目錄文件上傳到HDFS中;

4、主要技術點:
? ?1)FTPClient實現ftp文件下載;
? ?2)hadoop的IOUtils類實現文件從本地上傳到HDFS;

5、準備工作
? ?1)ftp服務器端口、用戶名和密碼、下載文件目錄;
? ? ? linux下ftp命令:進入$ftp ip/常用命令:ls/cd/put/get/mput/mget
? ?2)每個節點統一建立本地目錄/tmp/fjs/localftp,保存ftp服務器上下載的文件;
? ?3)Namenode上建立HDFS保存文件的目錄/tmp/fjs/ftp;
? ?4)Namenode上建立HDFS保存文件列表的目錄/tmp/fjs/in,即Map函數的輸入數據;

6、具體代碼:

? ?1)主類FtpMR:驅動類加MapReduce類;

package ct.gd;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class FtpMR {public static class FtpMap extends Mapper<Object,Text,Text,Text>{private Text _key = new Text();private Text _value = new Text(); public void map(Object key,Text value,Context context) throws IOException,InterruptedException{String line = value.toString();//tag是隨機值,目的是將文件均勻分到各節點下載,隨機范圍根據集群節點數,這里是0-100內//假設下載文件有1000個,100隨機范圍,集群有100個節點,那每個節點均勻可能獲得10個文件下載,//map輸出的<key,value>,輸入reduce時,key值相同的會形成value list,因此設計該隨機key值String tag = ComMethod.getRandomNbr();_key.set(tag);_value.set(line);context.write(_key,_value);}}public static class FtpReduce extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{String ftpSrv=context.getConfiguration().get("ftpSrv");//獲取ftp服務器連接信息String outputPath=context.getConfiguration().get("outputPath");//獲取hdfs存放文件的目錄FtpUtil fu=new FtpUtil(); for(Text value:values){String filename=value.toString();//輸入的value是ftp服務器上的文件名String localFile=fu.DownFileToLocal(ftpSrv,filename);//下載文件到本地目錄,并返回文件保存的路徑if (localFile!=null) fu.WriteFileToHDFS(localFile,ComMethod.changeToDir(outputPath),filename);//本地文件上傳到hdfs中}}}public static void main(String[] args) throws Exception { Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: FtpMR <in> <out>");System.exit(2);}String inputPath=otherArgs[0];//FTP服務器保存文件列表的文件目錄String outputPath=otherArgs[1];//下載的ftp文件保存在hdfs中的目錄FtpUtil fu=new FtpUtil();//ftp服務器字符串格式:IP|port|username|password|file directoryString strFtpSrv="IP|port|name|password|directory"; //獲取ftp服務器上文件列表,保存到hdfs的inputPath目錄下if(!fu.getFtpFileList(strFtpSrv,inputPath)){System.err.println("下載ftp服務器文件列表失敗");System.exit(2);}//將ftp服務器的參數作為參數傳遞到Reduce中conf.set("ftpSrv", strFtpSrv);//將hdfs上保存下載文件的目錄傳遞到Reduce中conf.set("outputPath", outputPath);Job job = new Job(conf, "FtpToHdfs");job.setJarByClass(FtpMR.class);//job.setNumReduceTasks(1);//設置reduce輸入文件一個,方便查看結果job.setMapperClass(FtpMap.class);job.setReducerClass(FtpReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(inputPath));FileOutputFormat.setOutputPath(job, new Path(outputPath));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

? ?2)接口類FtpUtil:主要處理ftp文件下載和寫入hdfs中;

package ct.gd;import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream;import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPReply; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils;public class FtpUtil {/*下載文件列表處理函數,開始*/public boolean getFtpFileList(String strFtpSrv,String inputPath){//從ftp服務器上讀取文件列表String[] FtpSrvConn=strFtpSrv.split("\\|");//截取ftp服務器連接信息 FTPClient ftp = new FTPClient();try { ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return false; } String remotePath=FtpSrvConn[4];//Ftp服務器上文件目錄ftp.changeWorkingDirectory(remotePath);FTPFile[] fs = ftp.listFiles(remotePath);StringBuffer buffer = new StringBuffer();for(FTPFile ff:fs){String fileName = ff.getName();buffer.append(fileName+"\n");}if(writeBufferToHDFSFile(buffer, inputPath)){ftp.logout();return true;}ftp.logout();} catch (IOException e) {System.out.println(e.getMessage());} finally { if (ftp.isConnected()) { try { ftp.disconnect(); } catch (IOException ioe) { System.out.println(ioe.getMessage()); } } } return false; }private boolean writeBufferToHDFSFile(StringBuffer buffer, String inputPath){//將文件列表寫到hdfs中Configuration conf = new Configuration();FileSystem fs = null;String fileName="fileLists.txt";try {fs = FileSystem.get(conf);inputPath = ComMethod.changeToDir(inputPath) + fileName;Path fsInputPath=new Path(inputPath);FSDataOutputStream outputStream = fs.create(fsInputPath);outputStream.write(buffer.toString().getBytes("UTF-8"));outputStream.flush();outputStream.sync();outputStream.close();return true;} catch (IOException e) {System.out.println(e.getMessage());}return false;}/*下載文件列表處理函數,結束*//*下載文件處理函數,開始*/public String DownFileToLocal(String ftpSrv,String filename){//在節點上創建本地保存下載文件的目錄String localPath="/tmp/fjs/localftp";File localDir = new File(localPath);//如果不存在就創建if(!localDir.exists()){localDir.mkdirs();}FTPClient ftp = new FTPClient();String[] FtpSrvConn=ftpSrv.split("\\|");//截取ftp服務器連接信息try { ftp.connect(FtpSrvConn[0], Integer.parseInt(FtpSrvConn[1])); //url和port ftp.login(FtpSrvConn[2], FtpSrvConn[3]); //name和passwordint reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); return null;} String remotePath=FtpSrvConn[4];//Ftp服務器上文件目錄ftp.changeWorkingDirectory(remotePath);String localFilePath = ComMethod.changeToDir(localPath) + filename;File localFile = new File(localFilePath);OutputStream is = new FileOutputStream(localFile);ftp.retrieveFile(filename, is);//下載is.close();ftp.logout();return localFilePath;} catch (IOException e) { System.err.println(e.getMessage());} finally { if (ftp.isConnected()) { try { ftp.disconnect(); } catch (IOException ioe) { } } } return null; }/*下載文件處理函數,結束*//*上傳文件到hdfs處理函數,開始*/public void WriteFileToHDFS(String localFile,String outputPath,String filename){Configuration conf = new Configuration();FileSystem fs = null;try {fs=FileSystem.get(conf);InputStream in = new BufferedInputStream(new FileInputStream(localFile));String ouputFile = outputPath + filename;//hdfs存放文件路勁和名字OutputStream out = fs.create(new Path(ouputFile));IOUtils.copyBytes(in, out, 1024*1024,true);//遷移out.flush();if(out!=null) out.close();if(in!=null) in.close();//刪除本地文件File _outputFileName = new File(localFile);if(_outputFileName.exists()) _outputFileName.delete();} catch (IOException e) {e.printStackTrace();} }/*上傳文件到hdfs處理函數,結束*/public static void main(String[] args) throws Exception { } }

? ?3)通用函數類ComMethod:主要是一些通用字符處理函數;

package ct.gd;import java.util.Random;public class ComMethod {public static String changeToDir(String dirPath){//目錄最后是否有/if(dirPath.charAt(dirPath.length()-1)!='/'){dirPath = dirPath + "/";}return dirPath;}public static String getRandomNbr(){//獲取隨機數Random rand = new Random();String nbr = String.valueOf(rand.nextInt(100));return nbr;}}


7、執行結果
? ?1)執行命令:yarn jar /mnt/mr.jar /tmp/fjs/in /tmp/fjs/ftp
? ?2)hadoop fs -ls /tmp/fjs/in 可以看到文件列表文件
? ?3)hadoop fs -ls /tmp/fjs/ftp 可以看到下載的文件
? ?4)每個節點ls -l /tmp/fjs/localftp,如果文件都遷入hdfs,應該為空

總結

以上是生活随笔為你收集整理的MapReduce基础开发之五分布式下载ftp文件到本地再迁移到hdfs的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。