ftp+线程池批量上传文件
生活随笔
收集整理的這篇文章主要介紹了
ftp+线程池批量上传文件
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
FTP服務器(File Transfer Protocol Server)是在互聯網上提供文件和訪問服務的計算機,它們依照提供服務。FTP是File Transfer Protocol(文件傳輸協議)。顧名思義,就是專門用來傳輸文件的協議。簡單地說,支持FTP協議的服務器就是FTP服務器
代碼:
public class FileBatchUploader implements Closeable {private final String ftpServer;private final String userName;private final String password;private final String targetRemoteDir;private final FTPClient ftp = new FTPClient();private final CompletionService<File> completionService;private final ExecutorService es;private final ExecutorService dispatcher;public FileBatchUploader(String ftpServer, String userName, String password,String targetRemoteDir) {this.ftpServer = ftpServer;this.userName = userName;this.password = password;this.targetRemoteDir = targetRemoteDir;// 使用單工作者線程的線程池this.es = Executors.newSingleThreadExecutor();this.dispatcher = Executors.newSingleThreadExecutor();this.completionService = new ExecutorCompletionService<File>(es);}public void uploadFiles(final Set<File> files) {dispatcher.submit(new Runnable() {@Overridepublic void run() {try {doUploadFiles(files);} catch (InterruptedException ignored) {}}});}private void doUploadFiles(Set<File> files) throws InterruptedException {// 批量提交文件上傳任務for (final File file : files) {completionService.submit(new UploadTask(file));}Future<File> future;File md5File;File uploadedFile;Set<File> md5Files = new HashSet<File>();for (File file : files) {try {future = completionService.take();uploadedFile = future.get();// 將上傳成功的文件移動到備份目錄,并為其生成相應的MD5文件md5File = generateMD5(moveToSuccessDir(uploadedFile));md5Files.add(md5File);} catch (ExecutionException | IOException | NoSuchAlgorithmException e) {e.printStackTrace();moveToDeadDir(file);}}for (File file : md5Files) {// 上傳相應的MD5文件completionService.submit(new UploadTask(file));}// 檢查md5文件的上傳結果int successUploaded = md5Files.size();for (int i = 0; i < successUploaded; i++) {future = completionService.take();try {uploadedFile = future.get();md5Files.remove(uploadedFile);} catch (ExecutionException e) {e.printStackTrace();}}// 將剩余(即未上傳成功)的md5文件移動到相應備份目錄for (File file : md5Files) {moveToDeadDir(file);}}private File generateMD5(File file) throws IOException, NoSuchAlgorithmException {String md5 = Tools.md5sum(file);File md5File = new File(file.getAbsolutePath() + ".md5");Files.write(Paths.get(md5File.getAbsolutePath()), md5.getBytes("UTF-8"));return md5File;}private static File moveToSuccessDir(File file) {File targetFile = null;try {targetFile = moveFile(file, Paths.get(file.getParent(), "..", "backup", "success"));} catch (IOException e) {e.printStackTrace();}return targetFile;}private static File moveToDeadDir(File file) {File targetFile = null;try {targetFile = moveFile(file, Paths.get(file.getParent(), "..", "backup", "dead"));} catch (IOException e) {e.printStackTrace();}return targetFile;}private static File moveFile(File srcFile, Path destPath) throws IOException {Path sourcePath = Paths.get(srcFile.getAbsolutePath());if (!Files.exists(destPath)) {Files.createDirectories(destPath);}Path destFile = destPath.resolve(srcFile.getName());Files.move(sourcePath, destFile,StandardCopyOption.REPLACE_EXISTING);return destFile.toFile();}class UploadTask implements Callable<File> {private final File file;public UploadTask(File file) {this.file = file;}@Overridepublic File call() throws Exception {Debug.info("uploading %s", file.getCanonicalPath());// 上傳指定的文件upload(file);return file;}}// 初始化FTP客戶端public void init() throws Exception {FTPClientConfig config = new FTPClientConfig();ftp.configure(config);int reply;ftp.connect(ftpServer);Debug.info("FTP Reply:%s", ftp.getReplyString());reply = ftp.getReplyCode();if (!FTPReply.isPositiveCompletion(reply)) {ftp.disconnect();throw new Exception("FTP server refused connection.");}boolean isOK = ftp.login(userName, password);if (isOK) {Debug.info("FTP Reply:%s", ftp.getReplyString());} else {throw new Exception("Failed to login." + ftp.getReplyString());}reply = ftp.cwd(targetRemoteDir);if (!FTPReply.isPositiveCompletion(reply)) {ftp.disconnect();throw new Exception("Failed to change working directory.reply:"+ reply);} else {Debug.info("FTP Reply:%s", ftp.getReplyString());}ftp.setFileType(FTP.ASCII_FILE_TYPE);}// 將指定的文件上傳至FTP服務器protected void upload(File file) throws Exception {boolean isOK;try (InputStream dataIn = new BufferedInputStream(new FileInputStream(file))) {isOK = ftp.storeFile(file.getName(), dataIn);}if (!isOK) {throw new IOException("Failed to upload " + file + ",reply:" + ","+ ftp.getReplyString());}}@Overridepublic void close() throws IOException {dispatcher.shutdown();try {es.awaitTermination(60, TimeUnit.SECONDS);} catch (InterruptedException ignored) {}es.shutdown();try {es.awaitTermination(60, TimeUnit.SECONDS);} catch (InterruptedException ignored) {}Tools.silentClose(new Closeable() {@Overridepublic void close() throws IOException {if (ftp.isConnected()) {ftp.disconnect();}}});} }測試上面代碼:
public static void main(String[] args) throws Exception {final FileBatchUploader uploader = new FileBatchUploader("localhost", "datacenter", "abc123","/home/datacenter/tmp/") {@Overridepublic void init() throws Exception {Debug.info("init...");super.init();}@Overrideprotected void upload(File file) throws Exception {super.upload(file);}@Overridepublic void close() throws IOException {Debug.info("close...");super.close();}};uploader.init();Set<File> files = new HashSet<File>();files.add(new File("/home/viscent/tmp/incomingX/message1.dat"));files.add(new File("/home/viscent/tmp/incomingX/message2.dat"));files.add(new File("/home/viscent/tmp/incomingX/message3.dat"));files.add(new File("/home/viscent/tmp/incomingX/message4.dat"));files.add(new File("/home/viscent/tmp/incomingX/message5.dat"));uploader.uploadFiles(files);Tools.delayedAction("", new Runnable() {@Overridepublic void run() {Tools.silentClose(uploader);}}, 120);}} 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的ftp+线程池批量上传文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: snowflake分布式自增长id的ja
- 下一篇: 整合rabbitmq+redis发送验证