日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

网络爬虫:分离生产者和消费者来优化爬虫程序

發布時間:2025/3/20 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 网络爬虫:分离生产者和消费者来优化爬虫程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

問題描述:

? 基于前面的一些工作(可點擊這里參見筆者前面的相關博客),我們取得了一些成果。不過存在的問題又總是會讓人坐立不安。本文通過分離生產者、消費者以及引入連接池技術來優化爬蟲程序。解決前面說到的數據庫連接數過大、程序長時間運行OOM的情況。


思路分析:

1.結構設計圖:

??

2.思路整理:

? 首先,我來說明一下在之前的工作中遺留下的一些主要問題:

? (1)數據庫Crash了(可能原因是unvisited_site表的數據過大引起的);

? (2)程序運行一段時間之后,出現OOM異常.

? 對于第一個問題,雖不能完全保證是因為數據量過大引發的問題,但是這可能是其中的一個方面。關于Crash的日志,請點擊這里查看。對此我的優化策略就是過濾添加到DB中的數據,另外引入連接池來避免數據的連接數過大導致連接數超過最大連接數異常。

? 而對于程序在長時間運行后OOM的情況,我表示這的確是困擾了我一些時間。一開始我是以為這是一個內存泄露的Bug,可是在漫長尋找Bug的過程中,我發現這可能并不是內存泄露,而是正常情況。事實也正是如此。下面的內存運行情況中會展示我修改了一些邏輯之后的合理內存使用情況。


程序運行情況展示:

1.內存:

??

2.線程:

??


3.CPU:

??


關鍵代碼:

1.連接池相關代碼:

ConnectionPool.java

public class ConnectionPool {private String jdbcDriver = ""; // 數據庫驅動private String dbUrl = ""; // 數據 URLprivate String dbUsername = ""; // 數據庫用戶名private String dbPassword = ""; // 數據庫用戶密碼private String testTable = ""; // 測試連接是否可用的測試表名,默認沒有測試表private int initialConnections = 1; // 連接池的初始大小private int incrementalConnections = 10; // 連接池自動增加的大小private int maxConnections = 500; // 連接池最大的大小private Vector<PooledConnection> connections = null; // 存放連接池中數據庫連接的向量 ,public ConnectionPool(String jdbcDriver, String dbUrl, String dbUsername, String dbPassword) {this.jdbcDriver = jdbcDriver;this.dbUrl = dbUrl;this.dbUsername = dbUsername;this.dbPassword = dbPassword;try {createPool();} catch (Exception e) {e.printStackTrace();}}public int getInitialConnections() {return this.initialConnections;}public void setInitialConnections(int initialConnections) {this.initialConnections = initialConnections;}public int getIncrementalConnections() {return this.incrementalConnections;}public void setIncrementalConnections(int incrementalConnections) {this.incrementalConnections = incrementalConnections;}public int getMaxConnections() {return this.maxConnections;}public void setMaxConnections(int maxConnections) {this.maxConnections = maxConnections;}public String getTestTable() {return this.testTable;}public void setTestTable(String testTable) {this.testTable = testTable;}public synchronized void createPool() throws Exception {// 如果連接池己經創建了,保存連接的向量 connections 不會為空if (connections != null) {return; // 如果己經創建,則返回}// 實例化 JDBC Driver 中指定的驅動類實例Driver driver = (Driver) (Class.forName(this.jdbcDriver).newInstance());DriverManager.registerDriver(driver); // 注冊 JDBC 驅動程序// 創建保存連接的向量 , 初始時有 0 個元素connections = new Vector<PooledConnection>();// 根據 initialConnections 中設置的值,創建連接。createConnections(this.initialConnections);System.out.println("create pool");}private void createConnections(int numConnections) throws SQLException {// 循環創建指定數目的數據庫連接for (int x = 0; x < numConnections; x++) {// 是否連接池中的數據庫連接的數量己經達到最大?最大值由類成員 maxConnections// 指出,如果 maxConnections 為 0 或負數,表示連接數量沒有限制。// 如果連接數己經達到最大,即退出。System.out.println(this.connections.size() + ", " + this.maxConnections);if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {System.out.println("連接數己經達到最大");break;}try {connections.addElement(new PooledConnection(newConnection()));} catch (SQLException e) {System.out.println(" 創建數據庫連接失敗! " + e.getMessage());throw new SQLException();}System.out.println(" 數據庫連接己創建 ......");}}private Connection newConnection() throws SQLException {Connection conn = DriverManager.getConnection(dbUrl, dbUsername, dbPassword);if (connections.size() == 0) {DatabaseMetaData metaData = conn.getMetaData();int driverMaxConnections = metaData.getMaxConnections();if (driverMaxConnections > 0 && this.maxConnections > driverMaxConnections) {this.maxConnections = driverMaxConnections;}}return conn; // 返回創建的新的數據庫連接}public synchronized PooledConnection getConnection() throws SQLException {// 確保連接池己被創建if (connections == null) {return null; // 連接池還沒創建,則返回 null}PooledConnection conn = getFreeConnection(); // 獲得一個可用的數據庫連接// 如果目前沒有可以使用的連接,即所有的連接都在使用中while (conn == null) {// 等一會再試wait(250);conn = getFreeConnection(); // 重新再試,直到獲得可用的連接,如果// getFreeConnection() 返回的為 null// 則表明創建一批連接后也不可獲得可用連接}return conn; // 返回獲得的可用的連接}public void print() {System.out.println("total connection:" + connections.size());int i = 1;for (PooledConnection conn : connections) {System.out.println("---" + i + ":" + conn.isBusy());}}private PooledConnection getFreeConnection() throws SQLException {// 從連接池中獲得一個可用的數據庫連接PooledConnection conn = findFreeConnection();if (conn == null) {// 如果目前連接池中沒有可用的連接// 創建一些連接System.out.println("目前連接池中沒有可用的連接,創建一些連接 ");createConnections(incrementalConnections);// 重新從池中查找是否有可用連接conn = findFreeConnection();if (conn == null) {// 如果創建連接后仍獲得不到可用的連接,則返回 nullreturn null;}}return conn;}private PooledConnection findFreeConnection() throws SQLException {// 獲得連接池向量中所有的對象for (int i = 0; i < connections.size(); i++) {PooledConnection pc = connections.elementAt(i);// System.out.println("pConn.isBusy():"+pConn.isBusy());if (!pc.isBusy()) {// 如果此對象不忙,則獲得它的數據庫連接并把它設為忙Connection conn = pc.getConnection();pc.setBusy(true);// 測試此連接是否可用if (!isValid(conn)) {// 如果此連接不可再用了,則創建一個新的連接,// 并替換此不可用的連接對象,如果創建失敗,刪除該無效連接,遍歷下一個不忙連接try {conn = newConnection();pc.setConnection(conn);} catch (SQLException e) {e.printStackTrace();connections.remove(i--);continue;}}return pc; // 己經找到一個可用的連接,退出}}return null; // 返回找到到的可用連接}private boolean isValid(Connection conn) {try {return conn.isValid(3000);} catch (SQLException e) {e.printStackTrace();return false;}}public void returnConnection(Connection conn) {// 確保連接池存在,如果連接沒有創建(不存在),直接返回if (connections == null) {System.out.println(" 連接池不存在,無法返回此連接到連接池中 !");return;}PooledConnection pConn = null;Enumeration<PooledConnection> enumerate = connections.elements();// 遍歷連接池中的所有連接,找到這個要返回的連接對象while (enumerate.hasMoreElements()) {pConn = (PooledConnection) enumerate.nextElement();// 先找到連接池中的要返回的連接對象if (conn == pConn.getConnection()) {// 找到了 , 設置此連接為空閑狀態pConn.setBusy(false);break;}}}public synchronized void refreshConnections() throws SQLException {// 確保連接池己創新存在if (connections == null) {System.out.println(" 連接池不存在,無法刷新 !");return;}PooledConnection pConn = null;Enumeration<PooledConnection> enumerate = connections.elements();while (enumerate.hasMoreElements()) {// 獲得一個連接對象pConn = (PooledConnection) enumerate.nextElement();// 如果對象忙則等 5 秒 ,5 秒后直接刷新if (pConn.isBusy()) {wait(5000); // 等 5 秒}// 關閉此連接,用一個新的連接代替它。closeConnection(pConn.getConnection());pConn.setConnection(newConnection());pConn.setBusy(false);}}public synchronized void closeConnectionPool() throws SQLException {// 確保連接池存在,如果不存在,返回if (connections == null) {System.out.println("連接池不存在,無法關閉 !");return;}PooledConnection pConn = null;Enumeration<PooledConnection> enumerate = connections.elements();while (enumerate.hasMoreElements()) {pConn = (PooledConnection) enumerate.nextElement();// 如果忙,等 5 秒if (pConn.isBusy()) {wait(5000); // 等 5 秒}// 5 秒后直接關閉它closeConnection(pConn.getConnection());// 從連接池向量中刪除它connections.removeElement(pConn);}// 置連接池為空connections = null;}private void closeConnection(Connection conn) {try {conn.close();} catch (SQLException e) {System.out.println(" 關閉數據庫連接出錯: " + e.getMessage());}}private void wait(int mSeconds) {try {Thread.sleep(mSeconds);} catch (InterruptedException e) {}}public class PooledConnection {private Connection connection = null;// 數據庫連接private boolean busy ; // 此連接是否正在使用的標志,默認沒有正在使用// 構造函數,根據一個 Connection 構告一個 PooledConnection 對象private PooledConnection(Connection connection) {this.connection = connection;}public ResultSet executeQuery(String sql) throws SQLException {return connection.createStatement().executeQuery(sql);}public int executeUpdate(String sql) throws SQLException {return connection.createStatement().executeUpdate(sql);}// 返回此對象中的連接private Connection getConnection() {return connection;}// 設置此對象的,連接private void setConnection(Connection connection) {this.connection = connection;}// 獲得對象連接是否忙private boolean isBusy() {return busy;}// 設置對象的連接正在忙private void setBusy(boolean busy) {this.busy = busy;}public void close() {busy = false;}} }
DBManager.java

public class DBManager {private static PooledConnection conn;private static ConnectionPool connectionPool;private static DBManager inst;private String mUrl = DBModel.getMysqlUrl();private String mUser = DBModel.getMysqlUesr();private String mPassword = DBModel.getMysqlPassword();private String mDriver = DBModel.getMysqlDerver();public void close() {try {connectionPool.closeConnectionPool();} catch (SQLException e) {e.printStackTrace();}}public DBManager() {if (inst != null)return;connectionPool = new ConnectionPool(mDriver, mUrl, mUser, mPassword);try {connectionPool.createPool();inst = this;} catch (Exception e) {e.printStackTrace();}}public static PooledConnection getConnection() {if (inst == null) {new DBManager();}try {conn = connectionPool.getConnection();} catch (SQLException e) {e.printStackTrace();}return conn;} }

連接池使用過程:

public static void insert(WebInfoModel model) {if (model == null) {return;}if (BEEStringTools.isEmptyString(model.getName()) || BEEStringTools.isEmptyString(model.getAddress())) {return;}String sql = "INSERT INTO visited_site(name, address, hash_address, date, level) VALUES('" + model.getName() + "', '" + model.getAddress() + "', " + model.getAddress().hashCode() + ", " + System.currentTimeMillis() + ", " + model.getLevel() + ")";PooledConnection conn = null;try {conn = DBManager.getConnection();conn.executeUpdate(sql);} catch (Exception e) {System.out.println("your sql is: " + sql + "\nError: " + e);} finally {conn.close();model = null;sql = null;}}

2.批量插入數據相關代碼:

/*** 將set中的數據批量insert到數據庫中* DBBLL* @param set*/public static void insertSet2UnvisitedBatch(SpiderSet set, Map<Integer, Integer> map) {if (set == null || set.size() == 0) {return;}String sql = "INSERT INTO unvisited_site(name,address,hash_address,date,visited,level) VALUES(?,?,?,?,?,?);";Connection conn = null;PreparedStatement ps = null;WebInfoModel model = null;try {conn = DriverManager.getConnection(DBModel.getMysqlUrl(), DBModel.getMysqlUesr(), DBModel.getMysqlPassword());conn.setAutoCommit(false);ps = conn.prepareStatement(sql);final int batchSize = 1000;int count = 0;while (!set.isEmpty()) {model = set.next();ps.setString(1, model.getName());ps.setString(2, model.getAddress());ps.setInt(3, model.getAddress().hashCode());ps.setLong(4, System.currentTimeMillis());ps.setInt(5, 0);ps.setInt(6, model.getLevel());ps.addBatch();if (++count % batchSize == 0) {ps.executeBatch();conn.commit();}}ps.executeBatch();conn.commit();} catch (Exception e) {System.out.println("Batch insert error:" + e);} finally {model = null;sql = null;try {ps.close();conn.close();} catch (SQLException e) {System.err.println("Close conn/ps error:" + e);}}}

3.分離消費者與生產者相關代碼 :

生產者ProduceToWaittingVisitedRunner:

從unvisited_site數據表中獲得數據,填充到待訪問的隊列中

public class ProduceToWaittingVisitedRunner implements Runnable {private SpiderQueue mQueue;private List<WebInfoModel> mModelList;private boolean mStop = false;public ProduceToWaittingVisitedRunner(SpiderQueue queue) {mQueue = queue;initEvent();}private void initEvent() {mModelList = new ArrayList<WebInfoModel>();}@Overridepublic void run() {while (!mStop) {mModelList = DBBLL.getUnvisitedInfoModels(mQueue.getMaxSize());if (mModelList == null || mModelList.size() == 0) {sleep(100);continue;}sleep(100);for (WebInfoModel model : mModelList) {mQueue.offer(model);DBBLL.updateUnvisited(model);}}}private void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();mStop = true;}} }
生產者ParserRunner:

從待訪問隊列中消費一個model,調用Python生產鏈接的列表Queue,將生成的列表Queue offer到結果Set中

public class ParserRunner implements Runnable {private SpiderSet mResultSet = null;private WebInfoModel mInfoModel = null;private int mIndex;private final boolean DEBUG = true;private Map<Integer, Integer> mResultMap = null;public ParserRunner(SpiderSet set, WebInfoModel model, int index, Map<Integer, Integer> resultMap) {mResultSet = set;mInfoModel = model;mIndex = index;mResultMap = resultMap;}@Overridepublic void run() {SpiderQueue tmpQueue = new SpiderQueue();PythonUtils.fillAddressQueueByPython(tmpQueue, mInfoModel.getAddress(), mInfoModel.getLevel()); // 記錄訪問某一個網頁中解析出的網址WebInfoModel model = null;while (!tmpQueue.isQueueEmpty()) {model = tmpQueue.poll();if (model == null || mResultMap.containsKey(model.getAddress().hashCode())) {continue;}mResultSet.add(model);putMap(model.getAddress().hashCode());}if (DEBUG) {System.out.println("[index:" + mIndex + ", size:" + mResultSet.size() + "]: " + mInfoModel.getAddress());}tmpQueue = null;model = null;}private void putMap(int hashUrl) {if (mResultMap.containsKey(hashUrl)) {mResultMap.put(hashUrl, 1 + mResultMap.get(hashUrl));} else {mResultMap.put(hashUrl, 1);}}@SuppressWarnings("unused")private void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}} }
消費者PushModelsToDBRunner:

從輔助Queue中消費產品(利用臨時Queue一次全額消費),此輔助Queue是從Python中解析出來的Url列表信息

public class PushModelsToDBRunner implements Runnable {private SpiderSet mResultSet = null;private SpiderSet tmpSet = null;private boolean mStop = false;private Map<Integer, Integer> mResultMap = null;public PushModelsToDBRunner(SpiderSet queue, Map<Integer, Integer> resultMap) {mResultSet = queue;mResultMap = resultMap;initEvent();}private void initEvent() {tmpSet = new SpiderSet();tmpSet.setMaxSize(1000);}@Overridepublic void run() {WebInfoModel model = null;while (!mStop) {if (mResultSet.isEmpty()) {sleep(100);continue;}tmpSet.setMaxSize(Math.max(mResultSet.size(), 1000)); // TODOwhile(!mResultSet.isEmpty()) {model = mResultSet.next();if (model == null) {continue;}tmpSet.add(model);if (tmpSet.isFull()) {break;}}DBBLL.insertSet2UnvisitedBatch(tmpSet, mResultMap);}model = null;}private void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();mStop = true;}} }


代碼優化點:

1.引入HashSet

? 引入HashSet的目的有兩個唯一和無序

2.每次消費HashSet為全額消費

? 全額消費的目的在于,避免HashSet中的元素占用過多內存(關于這一點,在上文中可以體現出我的確痛苦過...)。

3.引入mResultMap參數

? 此參數的類型是Map<Integer, Integer>,其中的key的類型Integer是表示address的hash值,第二個Integer是該address出現的次數。

? 可能你會問我,為什么要引入這個參數。引入此Map的目的是為了在入庫之前就進行一步去重(因為是對內存操作,所以效率會比較高)。不過說實話,引入這個參數,我也是憂慮過。因為這會引入一些內存的開銷,不過在我做過一些單元測試之后,發現它引入的內存開銷是很小的,所以就引入了此參數。

? 好了,引入此參數是可以解決一些問題。不過,你可能又會問我,沒有其他的方式既能去重又可以避免這些小量開銷嗎?優化此步操作有兩點需要很清楚:一是我的內存中存儲Url的Set長度很有限;二是我需要對Set中的數據進行批量insert。如果這個時候我們要每增一條記錄到數據庫都要進行check的話,這將是一個耗時的操作。如果引入此Map參數,那么問題就引刃而解了。

? 引入此參數后內存的使用情況如下圖:

??


存在的問題:

? 1.如果要算上效率的話,這是一個問題(目前的情況是15000左右/小時);

? 2.線程池中的解析HTML線程經常出現阻塞的情況。

總結

以上是生活随笔為你收集整理的网络爬虫:分离生产者和消费者来优化爬虫程序的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 办公室大战高跟丝袜秘书经理ol | 中文字幕在线播放视频 | 一区二区视频在线观看免费 | 日韩精品一二区 | 北条麻妃av在线播放 | 啪啪自拍 | 国产呦小j女精品视频 | www.黄色国产 | 91精品人妻一区二区三区蜜桃2 | 韩国一级淫一片免费放 | 久久国| 久操av| 国产老熟女一区二区三区 | 无码人妻aⅴ一区二区三区 国产高清一区二区三区四区 | 超碰98在线观看 | 午夜精品久久久久久久99黑人 | 亚洲精品成人 | 日本免费色视频 | 95香蕉视频| 国产男女无遮挡猛进猛出 | 国产精品视频一区二区三 | 免费成人av | 少妇人妻真实偷人精品视频 | 日韩精品一区二区在线看 | 鲁一鲁啪一啪 | 天堂а√在线中文在线新版 | 黄色av网站网址 | 国产粉嫩呻吟一区二区三区 | 亚洲午夜精品久久久 | 中文字幕观看在线 | 中文字幕乱码亚洲精品一区 | 亚洲美女高潮久久久 | 在线日韩免费 | 久久网免费视频 | 神马午夜我不卡 | 成人一区二区电影 | 亚洲天堂免费看 | 色噜噜狠狠一区二区三区果冻 | 一区二区三区免费看 | 欧美另类xxxxx| 看黄色的网站 | 在线网站av | 亚洲福利视频网站 | 欧洲中文字幕日韩精品成人 | 国产a级片免费看 | 反差在线观看免费版全集完整版 | av网站不卡| 亚洲免费av电影 | 亚洲图色av | 免费污片在线观看 | 国产白浆在线观看 | 人人草人人看 | 麻豆视频播放 | 超碰日日干| av av片在线看| 国产日韩久久久 | 久久久久人妻一道无码AV | 亚洲欧美一区二区三区四区五区 | 中文在线观看视频 | 国产看片网站 | 懂色av成人一区二区三区 | 亚洲午夜久久久 | 国产精品77 | 亚洲激情一区二区 | 亚洲成a人无码 | 亚洲草逼视频 | 日韩免费精品视频 | 天天干天天插天天操 | 亚洲精品另类 | 最近中文字幕 | 超碰青娱乐 | 欧美 日韩 国产 成人 | 天天操狠狠操夜夜操 | 国产激情a | 肉大捧一进一出免费视频 | 国产精品久久久久久久久久小说 | 今天高清视频在线观看视频 | 黑人干日本少妇 | 国产成人a亚洲精v品无码 | 国产免费aa | av免费在线观| 美女脱光衣服让男人捅 | 亚洲女人被黑人巨大进入 | 国产成人精 | 国产精品av在线 | 婷婷五月花 | 毛片大全| 亚洲精品污一区二区三区 | 99热国内精品 | 日本女人毛茸茸 | www.香蕉视频 | 亚洲精品乱码久久久久久久 | 国产97视频| aaa一区二区三区 | 五月婷婷激情网 | 快射视频在线观看 | xxxx国产片| 波多野结衣日韩 | 亚洲第一av|