日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

java多线程下载_Java实现多线程下载,支持断点续传

發(fā)布時(shí)間:2025/3/20 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java多线程下载_Java实现多线程下载,支持断点续传 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

多線程下載及斷點(diǎn)續(xù)傳的實(shí)現(xiàn)是使用 HTTP/1.1 引入的 Range 請求參數(shù),可以訪問Web資源的指定區(qū)間的內(nèi)容。雖然實(shí)現(xiàn)了多線程及斷點(diǎn)續(xù)傳,但還有很多不完善的地方。

包含四個類:

Downloader: 主類,負(fù)責(zé)分配任務(wù)給各個子線程,及檢測進(jìn)度

DownloadFile: 表示要下載的哪個文件,為了能寫輸入到文件的指定位置,使用 RandomAccessFile 類操作文件,多個線程寫同一個文件需要保證線程安全,這里直接調(diào)用 getChannel 方法,獲取一個文件通道,FileChannel是線程安全的。

DownloadTask: 實(shí)際執(zhí)行下載的線程,獲取 [lowerBound, upperBound] 區(qū)間的數(shù)據(jù),當(dāng)下載過程中出現(xiàn)異常時(shí)要通知其他線程(使用 AtomicBoolean),結(jié)束下載

Logger: 實(shí)時(shí)記錄下載進(jìn)度,以便續(xù)傳時(shí)知道從哪開始。感覺這里做的比較差,為了能實(shí)時(shí)寫出日志及方便地使用Properties類的load/store方法格式化輸入輸出,每次都是打開后再關(guān)閉。

演示:

隨便找一個文件下載:

強(qiáng)行結(jié)束程序并重新運(yùn)行:

日志文件:

斷點(diǎn)續(xù)傳的關(guān)鍵是記錄各個線程的下載進(jìn)度,這里細(xì)節(jié)比較多,花了很久。只需要記錄每個線程請求的Range區(qū)間極客,每次成功寫數(shù)據(jù)到文件時(shí),就更新一次下載區(qū)間。下面是下載完成后的日志內(nèi)容。

代碼:

Downloader.java

package downloader;

import java.io.*;

import java.net.*;

import java.nio.file.Files;

import java.nio.file.Path;

import java.util.concurrent.atomic.AtomicBoolean;

public class Downloader {

private static final int DEFAULT_THREAD_COUNT = 4; // 默認(rèn)線程數(shù)量

private AtomicBoolean canceled; // 取消狀態(tài),如果有一個子線程出現(xiàn)異常,則取消整個下載任務(wù)

private DownloadFile file; // 下載的文件對象

private String storageLocation;

private final int threadCount; // 線程數(shù)量

private long fileSize; // 文件大小

private final String url;

private long beginTime; // 開始時(shí)間

private Logger logger;

public Downloader(String url) {

this(url, DEFAULT_THREAD_COUNT);

}

public Downloader(String url, int threadCount) {

this.url = url;

this.threadCount = threadCount;

this.canceled = new AtomicBoolean(false);

this.storageLocation = url.substring(url.lastIndexOf('/')+1);

this.logger = new Logger(storageLocation + ".log", url, threadCount);

}

public void start() {

boolean reStart = Files.exists(Path.of(storageLocation + ".log"));

if (reStart) {

logger = new Logger(storageLocation + ".log");

System.out.printf("* 繼續(xù)上次下載進(jìn)度[已下載:%.2fMB]:%s\n", logger.getWroteSize() / 1014.0 / 1024, url);

} else {

System.out.println("* 開始下載:" + url);

}

if (-1 == (this.fileSize = getFileSize()))

return;

System.out.printf("* 文件大小:%.2fMB\n", fileSize / 1024.0 / 1024);

this.beginTime = System.currentTimeMillis();

try {

this.file = new DownloadFile(storageLocation, fileSize, logger);

if (reStart) {

file.setWroteSize(logger.getWroteSize());

}

// 分配線程下載

dispatcher(reStart);

// 循環(huán)打印進(jìn)度

printDownloadProgress();

} catch (IOException e) {

System.err.println("x 創(chuàng)建文件失敗[" + e.getMessage() + "]");

}

}

/**

* 分配器,決定每個線程下載哪個區(qū)間的數(shù)據(jù)

*/

private void dispatcher(boolean reStart) {

long blockSize = fileSize / threadCount; // 每個線程要下載的數(shù)據(jù)量

long lowerBound = 0, upperBound = 0;

long[][] bounds = null;

int threadID = 0;

if (reStart) {

bounds = logger.getBounds();

}

for (int i = 0; i < threadCount; i++) {

if (reStart) {

threadID = (int)(bounds[i][0]);

lowerBound = bounds[i][1];

upperBound = bounds[i][2];

} else {

threadID = i;

lowerBound = i * blockSize;

// fileSize-1 !!!!! fu.ck,找了一下午的錯

upperBound = (i == threadCount - 1) ? fileSize-1 : lowerBound + blockSize;

}

new DownloadTask(url, lowerBound, upperBound, file, canceled, threadID).start();

}

}

/**

* 循環(huán)打印進(jìn)度,直到下載完畢,或任務(wù)被取消

*/

private void printDownloadProgress() {

long downloadedSize = file.getWroteSize();

int i = 0;

long lastSize = 0; // 三秒前的下載量

while (!canceled.get() && downloadedSize < fileSize) {

if (i++ % 4 == 3) { // 每3秒打印一次

System.out.printf("下載進(jìn)度:%.2f%%, 已下載:%.2fMB,當(dāng)前速度:%.2fMB/s\n",

downloadedSize / (double)fileSize * 100 ,

downloadedSize / 1024.0 / 1024,

(downloadedSize - lastSize) / 1024.0 / 1024 / 3);

lastSize = downloadedSize;

i = 0;

}

try {

Thread.sleep(1000);

} catch (InterruptedException ignore) {}

downloadedSize = file.getWroteSize();

}

file.close();

if (canceled.get()) {

try {

Files.delete(Path.of(storageLocation));

} catch (IOException ignore) {

}

System.err.println("x 下載失敗,任務(wù)已取消");

} else {

System.out.println("* 下載成功,本次用時(shí)"+ (System.currentTimeMillis() - beginTime) / 1000 +"秒");

}

}

/**

* @return 要下載的文件的尺寸

*/

private long getFileSize() {

if (fileSize != 0) {

return fileSize;

}

HttpURLConnection conn = null;

try {

conn = (HttpURLConnection)new URL(url).openConnection();

conn.setConnectTimeout(3000);

conn.setRequestMethod("HEAD");

conn.connect();

System.out.println("* 連接服務(wù)器成功");

} catch (MalformedURLException e) {

throw new RuntimeException("URL錯誤");

} catch (IOException e) {

System.err.println("x 連接服務(wù)器失敗["+ e.getMessage() +"]");

return -1;

}

return conn.getContentLengthLong();

}

public static void main(String[] args) throws IOException {

new Downloader("http://js.xiazaicc.com//down2/ucliulanqi_downcc.zip").start();

}

}

DownloadTask.java

package downloader;

import java.io.*;

import java.net.HttpURLConnection;

import java.net.URL;

import java.nio.ByteBuffer;

import java.nio.channels.Channels;

import java.nio.channels.ReadableByteChannel;

import java.util.concurrent.atomic.AtomicBoolean;

class DownloadTask extends Thread {

private final String url;

private long lowerBound; // 下載的文件區(qū)間

private long upperBound;

private AtomicBoolean canceled;

private DownloadFile downloadFile;

private int threadId;

DownloadTask(String url, long lowerBound, long upperBound, DownloadFile downloadFile,

AtomicBoolean canceled, int threadID) {

this.url = url;

this.lowerBound = lowerBound;

this.upperBound = upperBound;

this.canceled = canceled;

this.downloadFile = downloadFile;

this.threadId = threadID;

}

@Override

public void run() {

ReadableByteChannel input = null;

try {

ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 2); // 2MB

input = connect();

System.out.println("* [線程" + threadId + "]連接成功,開始下載...");

int len;

while (!canceled.get() && lowerBound <= upperBound) {

buffer.clear();

len = input.read(buffer);

downloadFile.write(lowerBound, buffer, threadId, upperBound);

lowerBound += len;

}

if (!canceled.get()) {

System.out.println("* [線程" + threadId + "]下載完成" + ": " + lowerBound + "-" + upperBound);

}

} catch (IOException e) {

canceled.set(true);

System.err.println("x [線程" + threadId + "]遇到錯誤[" + e.getMessage() + "],結(jié)束下載");

} finally {

if (input != null) {

try {

input.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

/**

* 連接WEB服務(wù)器,并返回一個數(shù)據(jù)通道

* @return 返回通道

* @throws IOException 網(wǎng)絡(luò)連接錯誤

*/

private ReadableByteChannel connect() throws IOException {

HttpURLConnection conn = (HttpURLConnection)new URL(url).openConnection();

conn.setConnectTimeout(3000);

conn.setRequestMethod("GET");

conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);

// System.out.println("thread_"+ threadId +": " + lowerBound + "-" + upperBound);

conn.connect();

int statusCode = conn.getResponseCode();

if (HttpURLConnection.HTTP_PARTIAL != statusCode) {

conn.disconnect();

throw new IOException("狀態(tài)碼錯誤:" + statusCode);

}

return Channels.newChannel(conn.getInputStream());

}

}

DownloadFile.java

package downloader;

import java.io.IOException;

import java.io.RandomAccessFile;

import java.nio.ByteBuffer;

import java.nio.channels.FileChannel;

import java.util.concurrent.atomic.AtomicLong;

class DownloadFile {

private final RandomAccessFile file;

private final FileChannel channel; // 線程安全類

private AtomicLong wroteSize; // 已寫入的長度

private Logger logger;

DownloadFile(String fileName, long fileSize, Logger logger) throws IOException {

this.wroteSize = new AtomicLong(0);

this.logger = logger;

this.file = new RandomAccessFile(fileName, "rw");

file.setLength(fileSize);

channel = file.getChannel();

}

/**

* 寫數(shù)據(jù)

* @param offset 寫偏移量

* @param buffer 數(shù)據(jù)

* @throws IOException 寫數(shù)據(jù)出現(xiàn)異常

*/

void write(long offset, ByteBuffer buffer, int threadID, long upperBound) throws IOException {

buffer.flip();

int length = buffer.limit();

while (buffer.hasRemaining()) {

channel.write(buffer, offset);

}

wroteSize.addAndGet(length);

logger.updateLog(threadID, length, offset + length, upperBound); // 更新日志

}

/**

* @return 已經(jīng)下載的數(shù)據(jù)量,為了知道何時(shí)結(jié)束整個任務(wù),以及統(tǒng)計(jì)信息

*/

long getWroteSize() {

return wroteSize.get();

}

// 繼續(xù)下載時(shí)調(diào)用

void setWroteSize(long wroteSize) {

this.wroteSize.set(wroteSize);

}

void close() {

try {

file.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

Logger.java

package downloader;

import java.io.*;

import java.util.Properties;

class Logger {

private String logFileName; // 下載的文件的名字

private Properties log;

/**

* 重新開始下載時(shí),使用該構(gòu)造函數(shù)

* @param logFileName

*/

Logger(String logFileName) {

this.logFileName = logFileName;

log = new Properties();

FileInputStream fin = null;

try {

log.load(new FileInputStream(logFileName));

} catch (IOException ignore) {

} finally {

try {

fin.close();

} catch (Exception ignore) {}

}

}

Logger(String logFileName, String url, int threadCount) {

this.logFileName = logFileName;

this.log = new Properties();

log.put("url", url);

log.put("wroteSize", "0");

log.put("threadCount", String.valueOf(threadCount));

for (int i = 0; i < threadCount; i++) {

log.put("thread_" + i, "0-0");

}

}

synchronized void updateLog(int threadID, long length, long lowerBound, long upperBound) {

log.put("thread_"+threadID, lowerBound + "-" + upperBound);

log.put("wroteSize", String.valueOf(length + Long.parseLong(log.getProperty("wroteSize"))));

FileOutputStream file = null;

try {

file = new FileOutputStream(logFileName); // 每次寫時(shí)都清空文件

log.store(file, null);

} catch (IOException e) {

e.printStackTrace();

} finally {

if (file != null) {

try {

file.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

/**

* 獲取區(qū)間信息

* ret[i][0] = threadID, ret[i][1] = lowerBoundID, ret[i][2] = upperBoundID

* @return

*/

long[][] getBounds() {

long[][] bounds = new long[Integer.parseInt(log.get("threadCount").toString())][3];

int[] index = {0};

log.forEach((k, v) -> {

String key = k.toString();

if (key.startsWith("thread_")) {

String[] interval = v.toString().split("-");

bounds[index[0]][0] = Long.parseLong(key.substring(key.indexOf("_") + 1));

bounds[index[0]][1] = Long.parseLong(interval[0]);

bounds[index[0]++][2] = Long.parseLong(interval[1]);

}

});

return bounds;

}

long getWroteSize() {

return Long.parseLong(log.getProperty("wroteSize"));

}

}

總結(jié)

以上是生活随笔為你收集整理的java多线程下载_Java实现多线程下载,支持断点续传的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。