日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

利用java多线程向MongoDB中批量插入静态文件

發布時間:2023/11/27 生活经验 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 利用java多线程向MongoDB中批量插入静态文件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

第一步、開發環境:

    win7 64位(注:MongoDb在32位windows上有數量限制(2G),詳見官方文檔)

    Mongodb3.2

    mongofb_java_driver 3.2.2

第二部、安裝mongodb,并開啟服務

    略:可參見官方文檔

第三部、代碼

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
import org.bson.Document;
import com.mongodb.MongoClient;
import com.mongodb.MongoWriteException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;/***  created by soarhu 2016/4/21*/public class MongodbBatchInsetUtils {static final int ThreadNum=3;//設置向MongoDb中插入數據的線程數static int ThreadSizeCount = 0;//用于計算子線程完成數static final String HOST = "127.0.0.1";//主機static final int PORT = 27017;//端口    static final String DATABASE_NAME="mydb";//存儲數據庫名稱,如果不存在會自動創建數據庫static final String COLLECTION_NAME="md";//存儲Collectionpublic static final String DIR = "E:\\targets";//掃描文件路徑public static final String FILE_SUFFIX = "html";//掃描文件類型,不設置,默認為所有文件public static final String CHARSET = "UTF-8";//文件處理編碼格式public static void main(String[] args) {MongoClient client =new MongoClient(HOST,PORT);MongoDatabase dataBase = client.getDatabase(DATABASE_NAME);MongoCollection<Document> collection = dataBase.getCollection(COLLECTION_NAME);Pool p = new Pool();Produce pro = new Produce(p);Long startTime = System.currentTimeMillis(); new Thread(pro).start();//開啟從磁盤讀取文件的線程Thread[] th = new Thread[ThreadNum];for(int i=0;i<ThreadNum;i++){//開啟向mongoDb寫入數據的線程Thread a = new Thread(new Customer(p,collection));a.start();th[i]=a;}boolean res=true;while(res){if(MongodbBatchInsetUtils.ThreadSizeCount==ThreadNum+1){res=false;Long endTime = System.currentTimeMillis();System.out.println("數據寫入完成,吸入總數:"+p.hasUploadToDB+",共花費時間約為:"+(endTime-startTime)+"ms\n");for(Thread t:th){t.interrupt();//在子線程將數據寫完后,中斷子線程。
               }if(null!=client){client.close();//關閉連接collection=null;dataBase=null;} } else {System.out.println("已寫入數據:"+p.hasUploadToDB);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}}
}//生產者,從磁盤讀取數據
class Produce implements Runnable{private Pool pool=null;public Produce(Pool pool){this.pool= pool;}@Overridepublic void run() {getFilesInDir(MongodbBatchInsetUtils.DIR, MongodbBatchInsetUtils.FILE_SUFFIX);MongodbBatchInsetUtils.ThreadSizeCount++;System.out.println("READING FINISHED!!");}//遞歸讀取dir目錄中所有以suffix結尾的文件,若不指定文件類型,默認讀取所有文件public void getFilesInDir(String dir,String suffix){if(null!=dir && dir.trim().length()>0){File file = new File(dir.trim());if(file.exists() && file.isDirectory()){File[] flist = file.listFiles();if(null!=flist && flist.length>0){for(File f:flist){if(f.isFile()){if(null==suffix|| "".equals(suffix)){pool.putFile(f);}if(null!=suffix &&suffix.trim().length()>0){if(f.getName().endsWith(suffix.trim())){pool.putFile(f);}else{throw new RuntimeException("找不到對應文件類型");}}}else{getFilesInDir(f.getAbsolutePath(),suffix);}}}else{throw new RuntimeException("文件內容為空");}}else{throw new RuntimeException("目錄不存在,請檢查路徑正確性!");}}}
}//消費者,向mongoDb中寫數據
class Customer implements Runnable{private Pool pool=null;MongoCollection<Document> collection = null;public Customer(Pool pool,MongoCollection<Document> collection){this.pool = pool;this.collection = collection;}@Overridepublic void run() {while(true){File f = pool.fetchFile();if(null==f){return ;}try {saveToMonGoDb(f);
//                if(pool.hasUploadToDB%1000==0)
//                    System.out.println("已寫入數據:"+pool.hasUploadToDB);} catch (MongoWriteException e) {System.out.println("寫入數據庫異常:"+e.getMessage());return ;}if(pool.getSize()==0){System.out.println(Thread.currentThread().getName()+" :WRITTING FINISHED!!");MongodbBatchInsetUtils.ThreadSizeCount++;}}}//將文件以文件名為id,文件內容為值保存在數據庫中private void saveToMonGoDb(File file){String _id = file.getName().substring(0,file.getName().lastIndexOf("."));String content = readFileContext(file, MongodbBatchInsetUtils.CHARSET);Document document = new Document("_id",_id).append("content", content);collection.insertOne(document);}//讀取文件內容,以charSet編碼處理public static String readFileContext(File file,String charSet)  {StringBuilder sb;BufferedReader reader=null;try {reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charSet)); String line = null;sb = new StringBuilder();while(null!=(line = reader.readLine())){sb.append(line+"\n");}return sb.toString();}catch (Exception e) {System.out.println("文件讀取失敗!"+e.getMessage());}finally{try {reader.close();} catch (IOException e) {e.printStackTrace();}}return null;}}//池,緩沖區
class Pool{volatile int size=0;//緩沖區中條目數量volatile int limit =1000;volatile int hasUploadToDB=0;volatile private  List<File> files = new LinkedList<File>();//入棧public  synchronized void putFile(File file){while(files.size()==limit){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}files.add(file);notifyAll();++size;}//出棧public synchronized File fetchFile(){while(files.size()==0 ){try {this.wait();} catch (InterruptedException e) {return null;}}File file = null;notify();if(files.size()>0){file = files.remove(0);--size;++hasUploadToDB;}return file;}public int getSize(){return this.size;}}

?

轉載于:https://www.cnblogs.com/alienSmoking/p/5422675.html

總結

以上是生活随笔為你收集整理的利用java多线程向MongoDB中批量插入静态文件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。