【多线程编程学习】java多线程基于数据分割的大文件下载器
文章目錄
- 代碼:基于數據分割的大文件下載器
- 作為包裝的存儲對象類:
- 主文件下載類:
- 子任務下載類:
- 處理緩存:
- 啟動類:
- 數據分割思想產生的問題
代碼來自書籍《java多線程編程實戰指南》
代碼:基于數據分割的大文件下載器
在日常生活中,我們下載大文件的時候往往是使用專門的下載軟件而不是直接使用瀏覽器。這些下載軟件下載大文件時比較快的一個重要原因就是它們使用多線程技術。例如,一個大小為600MB的文件在網絡帶寬為100Mbps的情況下,使用單個線程下載該文件至少需要耗時48(=600/(100/8))秒。如果我們采用3個線程來下載該文件,其中每個線程分別下載該文件的一個部分,那么下載這個文件所需的時間基本上可以減少為16(=600/3/(100/8))秒,比起單線程下載節省了2/3的時間。按照這個思路實現的一個基于多線程的大文件下載器
首先,我們先獲取待下載資源的大小,這個大小相當于文件下載器的輸入數據的原始規模(總規模)。接著,我們根據設定的下載線程數(workerThreadsCount)來決定子任務的總個數,并由此確定每個子任務負責下載的數據段的范圍(起始字節到結束字節,lowerBound~upperBound)。然后我們分別創建相應的下載子任務(DownloadTask類實例)并為每個下載任務創建相應的下載線程。這些線程啟動后就會并發地下載大文件中的相應部分。
作為包裝的存儲對象類:
public class Storage implements Closeable, AutoCloseable {private final RandomAccessFile storeFile;private final FileChannel storeChannel;protected final AtomicLong totalWrites = new AtomicLong(0);public Storage(long fileSize, String fileShortName) throws IOException {String fullFileName = System.getProperty("java.io.tmpdir") + "/"+ fileShortName;String localFileName;localFileName = createStoreFile(fileSize, fullFileName);storeFile = new RandomAccessFile(localFileName, "rw");storeChannel = storeFile.getChannel();}/*** 將data中指定的數據寫入文件** @param offset* 寫入數據在整個文件中的起始偏移位置* @param byteBuf* byteBuf必須在該方法調用前執行byteBuf.flip()* @throws IOException* @return 寫入文件的數據長度*/public int store(long offset, ByteBuffer byteBuf)throws IOException {int length;storeChannel.write(byteBuf, offset);length = byteBuf.limit();totalWrites.addAndGet(length);return length;}public long getTotalWrites() {return totalWrites.get();}private String createStoreFile(final long fileSize, String fullFileName)throws IOException {File file = new File(fullFileName);Debug.info("create local file:%s", fullFileName);RandomAccessFile raf;raf = new RandomAccessFile(file, "rw");try {raf.setLength(fileSize);} finally {Tools.silentClose(raf);}return fullFileName;}@Overridepublic synchronized void close() throws IOException {if (storeChannel.isOpen()) {Tools.silentClose(storeChannel, storeFile);}} }主文件下載類:
import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.atomic.AtomicBoolean;/*** 大文件下載器** @author Viscent Huang*/ public class BigFileDownloader {protected final URL requestURL;protected final long fileSize;/*** 負責已下載數據的存儲*/protected final Storage storage;protected final AtomicBoolean taskCanceled = new AtomicBoolean(false);public BigFileDownloader(String strURL) throws Exception {requestURL = new URL(strURL);// 獲取待下載資源的大小(單位:字節)fileSize = retieveFileSize(requestURL);Debug.info("file total size:%s", fileSize);String fileName = strURL.substring(strURL.lastIndexOf('/') + 1);// 創建負責存儲已下載數據的對象storage = new Storage(fileSize, fileName);}/*** 下載指定的文件** @param taskCount* 任務個數* @param reportInterval* 下載進度報告周期* @throws Exception*/public void download(int taskCount, long reportInterval)throws Exception {long chunkSizePerThread = fileSize / taskCount;// 下載數據段的起始字節long lowerBound = 0;// 下載數據段的結束字節long upperBound = 0;DownloadTask dt;for (int i = taskCount - 1; i >= 0; i--) {lowerBound = i * chunkSizePerThread;if (i == taskCount - 1) {upperBound = fileSize;} else {upperBound = lowerBound + chunkSizePerThread - 1;}// 創建下載任務dt = new DownloadTask(lowerBound, upperBound, requestURL, storage,taskCanceled);dispatchWork(dt, i);}// 定時報告下載進度reportProgress(reportInterval);// 清理程序占用的資源doCleanup();}protected void doCleanup() {Tools.silentClose(storage);}protected void cancelDownload() {if (taskCanceled.compareAndSet(false, true)) {doCleanup();}}protected void dispatchWork(final DownloadTask dt, int workerIndex) {// 創建下載線程Thread workerThread = new Thread(new Runnable() {@Overridepublic void run() {try {dt.run();} catch (Exception e) {e.printStackTrace();// 取消整個文件的下載cancelDownload();}}});workerThread.setName("downloader-" + workerIndex);workerThread.start();}// 根據指定的URL獲取相應文件的大小private static long retieveFileSize(URL requestURL) throws Exception {long size = -1;HttpURLConnection conn = null;try {conn = (HttpURLConnection) requestURL.openConnection();conn.setRequestMethod("HEAD");conn.setRequestProperty("Connection", "Keep-alive");conn.connect();int statusCode = conn.getResponseCode();if (HttpURLConnection.HTTP_OK != statusCode) {throw new Exception("Server exception,status code:" + statusCode);}String cl = conn.getHeaderField("Content-Length");size = Long.valueOf(cl);} finally {if (null != conn) {conn.disconnect();}}return size;}// 報告下載進度private void reportProgress(long reportInterval) throws InterruptedException {float lastCompletion;int completion = 0;while (!taskCanceled.get()) {lastCompletion = completion;completion = (int) (storage.getTotalWrites() * 100 / fileSize);if (completion == 100) {break;} else if (completion - lastCompletion >= 1) {Debug.info("Completion:%s%%", completion);if (completion >= 90) {reportInterval = 1000;}}Thread.sleep(reportInterval);}Debug.info("Completion:%s%%", completion);} }子任務下載類:
/*** 下載子任務** @author Viscent Huang*/ public class DownloadTask implements Runnable {private final long lowerBound;private final long upperBound;private final DownloadBuffer xbuf;private final URL requestURL;private final AtomicBoolean cancelFlag;public DownloadTask(long lowerBound, long upperBound, URL requestURL,Storage storage, AtomicBoolean cancelFlag) {this.lowerBound = lowerBound;this.upperBound = upperBound;this.requestURL = requestURL;this.xbuf = new DownloadBuffer(lowerBound, upperBound, storage);this.cancelFlag = cancelFlag;}// 對指定的URL發起HTTP分段下載請求private static InputStream issueRequest(URL requestURL, long lowerBound,long upperBound) throws IOException {Thread me = Thread.currentThread();Debug.info(me + "->[" + lowerBound + "," + upperBound + "]");final HttpURLConnection conn;InputStream in = null;conn = (HttpURLConnection) requestURL.openConnection();String strConnTimeout = System.getProperty("x.dt.conn.timeout");int connTimeout = null == strConnTimeout ? 60000 : Integer.valueOf(strConnTimeout);conn.setConnectTimeout(connTimeout);String strReadTimeout = System.getProperty("x.dt.read.timeout");int readTimeout = null == strReadTimeout ? 60000 : Integer.valueOf(strReadTimeout);conn.setReadTimeout(readTimeout);conn.setRequestMethod("GET");conn.setRequestProperty("Connection", "Keep-alive");// Range: bytes=0-1024conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);conn.setDoInput(true);conn.connect();int statusCode = conn.getResponseCode();if (HttpURLConnection.HTTP_PARTIAL != statusCode) {conn.disconnect();throw new IOException("Server exception,status code:" + statusCode);}Debug.info(me + "-Content-Range:" + conn.getHeaderField("Content-Range")+ ",connection:" + conn.getHeaderField("connection"));in = new BufferedInputStream(conn.getInputStream()) {@Overridepublic void close() throws IOException {try {super.close();} finally {conn.disconnect();}}};return in;}@Overridepublic void run() {if (cancelFlag.get()) {return;}ReadableByteChannel channel = null;try {channel = Channels.newChannel(issueRequest(requestURL, lowerBound,upperBound));ByteBuffer buf = ByteBuffer.allocate(1024);while (!cancelFlag.get() && channel.read(buf) > 0) {// 將從網絡讀取的數據寫入緩沖區xbuf.write(buf);buf.clear();}} catch (Exception e) {throw new RuntimeException(e);} finally {Tools.silentClose(channel, xbuf);}} }處理緩存:
每個下載線程從網絡讀取一段數據(例如1KB的數據)就將其寫入文件這種方法固然簡單,但是容易增加I/O的次數。有鑒于此,我們采用了緩沖的方法:下載線程每次從網絡讀取的數據都是先被寫入緩沖區,只有當這個緩沖區滿的時候其中的內容才會被寫入本地文件。
這個緩沖區是通過類DownloadBuffer實現的,將緩沖區中的內容寫入本地文件是通過類Storage實現的。
啟動類:
public class CaseRunner4_1 {public static void main(String[] args) throws Exception {if (0 == args.length) {args = new String[] { "http://yourserver.com/bigfile", "2", "3" };}main0(args);}public static void main0(String[] args) throws Exception {final int argc = args.length;BigFileDownloader downloader = new BigFileDownloader(args[0]);// 下載線程數int workerThreadsCount = argc >= 2 ? Integer.valueOf(args[1]) : 2;long reportInterval = argc >= 3 ? Integer.valueOf(args[2]) : 2;Debug.info("downloading %s%nConfig:worker threads:%s,reportInterval:%s s.",args[0], workerThreadsCount, reportInterval);downloader.download(workerThreadsCount, reportInterval * 1000);} }數據分割思想產生的問題
數據的分割這種并發化策略是從程序處理的數據角度入手,將原始輸入分解為若干規模更小的子輸入,并將這些子輸入指派給專門的工作者線程處理。
基于數據的分割的結果是產生多個同質工作者線程,即任務處理邏輯相同的線程。例如,上述案例中的BigFileDownloader創建的工作者線程都是DownloadTask的實例。盡管基于數據的分割的基本思想不難理解,但是在實際運用中,我們往往有更多的細節需要考慮。
1.工作者線程數量的合理設置問題。
在原始輸入規模一定的情況下,增加工作者線程數量可以減小子輸入的規模,從而減少每個工作者線程執行任務所需的時間。但是線程數量的增加也會導致其他開銷(比如上下文切換)增加。例如,上述案例從表面上看,我們似乎可以指定更多的下載線程數來縮短資源下載耗時。比如,我們設定10個線程用于下載一個大小為600MB的資源,那么每個線程僅需要下載這個大文件中60MB的數據,這樣看來似乎我們僅需要單線程下載的1/6時間就可以完成整個資源下載。但實際的結果卻可能并非如此:增加下載線程數的確可以減少每個下載線程的輸入規模(子輸入的規模),從而縮短每個下載線程完成數據段下載所需的時間;但是這同時也增加了上下文切換的開銷、線程創建與銷毀的開銷、建立網絡連接的開銷以及鎖的爭用等開銷,而這些增加的開銷可能無法被子輸入規模減小所帶來的好處所抵消。另一方面,工作者線程數量過少又可能導致子輸入的規模仍然過大,這使得計算效率提升不明顯。在本案例中,我們通過命令行參數指定工作者線程數量,本章后續內容會介紹工作者線程數的合理設置。
對于一個工作者線程執行過程中出現的異常,我們該如何處理呢?例如,在本案例的一個下載線程執行過程中出現異常的時候,這個線程是可以進行重試(針對可恢復的故障)呢,還是說直接就算整個資源的下載失敗呢?如果是算這個資源下載失敗,那么此時其他工作者線程就沒有必要繼續運行下去了。因此,此時就涉及終止其他線程的運行問題。
3.原始輸入規模未知問題。
在上述例子中,由于原始輸入的規模是事先可知的,因此我們可以采用簡單的均分對原始輸入進行分解。但是,某些情況下我們可能無法事先確定原始輸入的規模,或者事先確定原始輸入規模是一個開銷極大的計算。比如,要從幾百個日志文件(其中每個文件可包含上萬條記錄)中統計出我們所需的信息,盡管理論上我們可以事先計算出總記錄條數,但是這樣做的開銷會比較大,因而實際上這是不可行的。此時原始輸入的規模就相當于事先不可知。對于這種原始輸入規模事先不可知的問題,我們可以采用批處理的方式對原始輸入進行分解:聚集了一批數據之后再將這些數據指派給工作者線程進行處理。這種方法類似于公安局辦證中心辦理護照的情形,雖然每天都可能有人去申請護照,但是辦證中心并不是為每個申請人專門辦理護照的,而是湊足一批申請人的材料后才進行統一辦理的。在批處理的分解方式中,工作者線程往往是事先啟動的,并且我們還需要考慮這些工作者線程的負載均衡問題,即新聚集的一批數據按照什么樣的規則被指派給哪個工作者線程的問題。——如果我們把新聚集的一批數據看作一個請求,而把工作者線程看作一個“服務器節點”,那么這兩個問題實際上就是一個問題。 程序的復雜性增加的問題。
4.基于數據的分割產生的多線程程序可能比相應的單線程程序要復雜。
例如,上述案例中雖然多個工作者線程并發地從服務器上下載大文件可以提升計算效率,但是它也帶來一個問題:這些數據段是并發地從服務器上下載的,但是我們最終要得到的是一個完整的大文件,而不是幾個較小的文件。因此,我們有兩種選擇:其中一種方法是,各個工作者線程將其下載的數據段分別寫入各自的本地文件(子文件),等到所有工作者線程結束之后,我們再將這些子文件合并為我們最終需要的文件。顯然,當待下載的資源非常大的時候合并這些子文件也是一筆不小的開銷。另外一種方法是將各個工作者線程從服務器上下載到的數據都寫入同一個本地文件,這個文件被寫滿之后就是我們最終所需的大文件。第二種方法看起來比較簡單,但是這里面有個矛盾需要調和:文件數據是并發地從服務器上下載(讀取)的,但是將這些數據寫入本地文件的時候,我們又必須確保這些數據按照原始文件(服務器上的資源)的順序被寫入這個本地文件的相應位置(起始字節和結束字節)。
總結
以上是生活随笔為你收集整理的【多线程编程学习】java多线程基于数据分割的大文件下载器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ApplicationContextAw
- 下一篇: nacos作注册中心+feign接口调用