日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

第17讲:aiohttp 异步爬虫实战

發(fā)布時(shí)間:2024/4/11 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 第17讲:aiohttp 异步爬虫实战 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在上一課時(shí)我們介紹了異步爬蟲(chóng)的基本原理和 asyncio 的基本用法,另外在最后簡(jiǎn)單提及了 aiohttp 實(shí)現(xiàn)網(wǎng)頁(yè)爬取的過(guò)程,這一可是我們來(lái)介紹一下 aiohttp 的常見(jiàn)用法,以及通過(guò)一個(gè)實(shí)戰(zhàn)案例來(lái)介紹下使用 aiohttp 完成網(wǎng)頁(yè)異步爬取的過(guò)程。

aiohttp

前面介紹的 asyncio 模塊內(nèi)部實(shí)現(xiàn)了對(duì) TCP、UDP、SSL 協(xié)議的異步操作,但是對(duì)于 HTTP 請(qǐng)求的異步操作來(lái)說(shuō),我們就需要用到 aiohttp 來(lái)實(shí)現(xiàn)了。

aiohttp 是一個(gè)基于 asyncio 的異步 HTTP 網(wǎng)絡(luò)模塊,它既提供了服務(wù)端,又提供了客戶(hù)端。其中我們用服務(wù)端可以搭建一個(gè)支持異步處理的服務(wù)器,用于處理請(qǐng)求并返回響應(yīng),類(lèi)似于 Django、Flask、Tornado 等一些 Web 服務(wù)器。而客戶(hù)端我們就可以用來(lái)發(fā)起請(qǐng)求,就類(lèi)似于 requests 來(lái)發(fā)起一個(gè) HTTP 請(qǐng)求然后獲得響應(yīng),但 requests 發(fā)起的是同步的網(wǎng)絡(luò)請(qǐng)求,而 aiohttp 則發(fā)起的是異步的。

本課時(shí)我們就主要來(lái)了解一下 aiohttp 客戶(hù)端部分的使用。

基本使用

基本實(shí)例

首先我們來(lái)看一個(gè)基本的 aiohttp 請(qǐng)求案例,代碼如下:

import aiohttp import asyncio async def fetch(session, url):async with session.get(url) as response:return await response.text(), response.status async def main():async with aiohttp.ClientSession() as session:html, status = await fetch(session, 'https://cuiqingcai.com')print(f'html: {html[:100]}...')print(f'status: {status}') if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main())

在這里我們使用 aiohttp 來(lái)爬取了我的個(gè)人博客,獲得了源碼和響應(yīng)狀態(tài)碼并輸出,運(yùn)行結(jié)果如下:

html: <!DOCTYPE HTML> <html> <head> <meta charset="UTF-8"> <meta name="baidu-tc-verification" content=... status: 200

這里網(wǎng)頁(yè)源碼過(guò)長(zhǎng),只截取輸出了一部分,可以看到我們成功獲取了網(wǎng)頁(yè)的源代碼及響應(yīng)狀態(tài)碼 200,也就完成了一次基本的 HTTP 請(qǐng)求,即我們成功使用 aiohttp 通過(guò)異步的方式進(jìn)行了網(wǎng)頁(yè)的爬取,當(dāng)然這個(gè)操作用之前我們所講的 requests 同樣也可以做到。

我們可以看到其請(qǐng)求方法的定義和之前有了明顯的區(qū)別,主要有如下幾點(diǎn):

  • 首先在導(dǎo)入庫(kù)的時(shí)候,我們除了必須要引入 aiohttp 這個(gè)庫(kù)之外,還必須要引入 asyncio 這個(gè)庫(kù),因?yàn)橐獙?shí)現(xiàn)異步爬取需要啟動(dòng)協(xié)程,而協(xié)程則需要借助于 asyncio 里面的事件循環(huán)來(lái)執(zhí)行。除了事件循環(huán),asyncio 里面也提供了很多基礎(chǔ)的異步操作。
  • 異步爬取的方法的定義和之前有所不同,在每個(gè)異步方法前面統(tǒng)一要加 async 來(lái)修飾。
  • with as 語(yǔ)句前面同樣需要加 async 來(lái)修飾,在 Python 中,with as 語(yǔ)句用于聲明一個(gè)上下文管理器,能夠幫我們自動(dòng)分配和釋放資源,而在異步方法中,with as 前面加上 async 代表聲明一個(gè)支持異步的上下文管理器。
  • 對(duì)于一些返回 coroutine 的操作,前面需要加 await 來(lái)修飾,如 response 調(diào)用 text 方法,查詢(xún) API 可以發(fā)現(xiàn)其返回的是 coroutine 對(duì)象,那么前面就要加 await;而對(duì)于狀態(tài)碼來(lái)說(shuō),其返回值就是一個(gè)數(shù)值類(lèi)型,那么前面就不需要加 await。所以,這里可以按照實(shí)際情況處理,參考官方文檔說(shuō)明,看看其對(duì)應(yīng)的返回值是怎樣的類(lèi)型,然后決定加不加 await 就可以了。
  • 最后,定義完爬取方法之后,實(shí)際上是 main 方法調(diào)用了 fetch 方法。要運(yùn)行的話(huà),必須要啟用事件循環(huán),事件循環(huán)就需要使用 asyncio 庫(kù),然后使用 run_until_complete 方法來(lái)運(yùn)行。

注意在 Python 3.7 及以后的版本中,我們可以使用 asyncio.run(main())
來(lái)代替最后的啟動(dòng)操作,不需要顯式聲明事件循環(huán),run 方法內(nèi)部會(huì)自動(dòng)啟動(dòng)一個(gè)事件循環(huán)。但這里為了兼容更多的 Python
版本,依然還是顯式聲明了事件循環(huán)。

URL 參數(shù)設(shè)置

對(duì)于 URL 參數(shù)的設(shè)置,我們可以借助于 params 參數(shù),傳入一個(gè)字典即可,示例如下:

import aiohttp import asyncio async def main():params = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.get('https://httpbin.org/get', params=params) as response:print(await response.text()) if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

運(yùn)行結(jié)果如下:

{"args": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85eed2-d240ac90f4dddf40b4723ef0"},"origin": "17.20.255.122","url": "https://httpbin.org/get?name=germey&age=25" }

這里可以看到,其實(shí)際請(qǐng)求的 URL 為 https://httpbin.org/get?name=germey&age=25,其 URL 請(qǐng)求參數(shù)就對(duì)應(yīng)了 params 的內(nèi)容。

其他請(qǐng)求類(lèi)型

另外 aiohttp 還支持其他的請(qǐng)求類(lèi)型,如 POST、PUT、DELETE 等等,這個(gè)和 requests 的使用方式有點(diǎn)類(lèi)似,示例如下:

session.post('http://httpbin.org/post', data=b'data') session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')

POST 數(shù)據(jù)

對(duì)于 POST 表單提交,其對(duì)應(yīng)的請(qǐng)求頭的 Content-type 為 application/x-www-form-urlencoded,我們可以用如下方式來(lái)實(shí)現(xiàn),代碼示例如下:

import aiohttp import asyncio async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', data=data) as response:print(await response.text()) if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

運(yùn)行結(jié)果如下:

{"args": {},"data": "","files": {},"form": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "18","Content-Type": "application/x-www-form-urlencoded","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f0b2-9017ea603a68dc285e0552d0"},"json": null,"origin": "17.20.255.58","url": "https://httpbin.org/post" }

對(duì)于 POST JSON 數(shù)據(jù)提交,其對(duì)應(yīng)的請(qǐng)求頭的 Content-type 為 application/json,我們只需要將 post 方法的 data 參數(shù)改成 json 即可,代碼示例如下:

async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', json=data) as response:print(await response.text())

運(yùn)行結(jié)果如下:

{"args": {},"data": "{\"name\": \"germey\", \"age\": 25}","files": {},"form": {},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "29","Content-Type": "application/json","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f03e-c91c9a20c79b9780dbed7540"},"json": {"age": 25,"name": "germey"},"origin": "17.20.255.58","url": "https://httpbin.org/post" }

響應(yīng)字段

對(duì)于響應(yīng)來(lái)說(shuō),我們可以用如下的方法分別獲取響應(yīng)的狀態(tài)碼、響應(yīng)頭、響應(yīng)體、響應(yīng)體二進(jìn)制內(nèi)容、響應(yīng)體 JSON 結(jié)果,代碼示例如下:

import aiohttp import asyncio async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', data=data) as response:print('status:', response.status)print('headers:', response.headers)print('body:', await response.text())print('bytes:', await response.read())print('json:', await response.json()) if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

運(yùn)行結(jié)果如下:

status: 200 headers: <CIMultiDictProxy('Date': 'Thu, 02 Apr 2020 14:13:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '503', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> body: {"args": {},"data": "","files": {},"form": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "18","Content-Type": "application/x-www-form-urlencoded","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"},"json": null,"origin": "17.20.255.58","url": "https://httpbin.org/post" } bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "25", \n "name": "germey"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "18", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "httpbin.org", \n "User-Agent": "Python/3.7 aiohttp/3.6.2", \n "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"\n }, \n "json": null, \n "origin": "17.20.255.58", \n "url": "https://httpbin.org/post"\n}\n' json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'germey'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.7 aiohttp/3.6.2', 'X-Amzn-Trace-Id': 'Root=1-5e85f2f1-f55326ff5800b15886c8e029'}, 'json': None, 'origin': '17.20.255.58', 'url': 'https://httpbin.org/post'}

這里我們可以看到有些字段前面需要加 await,有的則不需要。其原則是,如果其返回的是一個(gè) coroutine 對(duì)象(如 async 修飾的方法),那么前面就要加 await,具體可以看 aiohttp 的 API,其鏈接為:https://docs.aiohttp.org/en/stable/client_reference.html。

超時(shí)設(shè)置

對(duì)于超時(shí)的設(shè)置,我們可以借助于 ClientTimeout 對(duì)象,比如這里我要設(shè)置 1 秒的超時(shí),可以這么來(lái)實(shí)現(xiàn):

import aiohttp import asyncio async def main():timeout = aiohttp.ClientTimeout(total=1)async with aiohttp.ClientSession(timeout=timeout) as session:async with session.get('https://httpbin.org/get') as response:print('status:', response.status) if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

如果在 1 秒之內(nèi)成功獲取響應(yīng)的話(huà),運(yùn)行結(jié)果如下:

200

如果超時(shí)的話(huà),會(huì)拋出 TimeoutError 異常,其類(lèi)型為 asyncio.TimeoutError,我們?cè)龠M(jìn)行異常捕獲即可。

另外 ClientTimeout 對(duì)象聲明時(shí)還有其他參數(shù),如 connect、socket_connect 等,詳細(xì)說(shuō)明可以參考官方文檔:https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts。

并發(fā)限制

由于 aiohttp 可以支持非常大的并發(fā),比如上萬(wàn)、十萬(wàn)、百萬(wàn)都是能做到的,但這么大的并發(fā)量,目標(biāo)網(wǎng)站是很可能在短時(shí)間內(nèi)無(wú)法響應(yīng)的,而且很可能瞬時(shí)間將目標(biāo)網(wǎng)站爬掛掉。所以我們需要控制一下爬取的并發(fā)量。

在一般情況下,我們可以借助于 asyncio 的 Semaphore 來(lái)控制并發(fā)量,代碼示例如下:

import asyncio import aiohttp CONCURRENCY = 5 URL = 'https://www.baidu.com' semaphore = asyncio.Semaphore(CONCURRENCY) session = None async def scrape_api():async with semaphore:print('scraping', URL)async with session.get(URL) as response:await asyncio.sleep(1)return await response.text() async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]await asyncio.gather(*scrape_index_tasks) if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

在這里我們聲明了 CONCURRENCY 代表爬取的最大并發(fā)量為 5,同時(shí)聲明爬取的目標(biāo) URL 為百度。接著我們借助于 Semaphore 創(chuàng)建了一個(gè)信號(hào)量對(duì)象,賦值為 semaphore,這樣我們就可以用它來(lái)控制最大并發(fā)量了。怎么使用呢?我們這里把它直接放置在對(duì)應(yīng)的爬取方法里面,使用 async with 語(yǔ)句將 semaphore 作為上下文對(duì)象即可。這樣的話(huà),信號(hào)量可以控制進(jìn)入爬取的最大協(xié)程數(shù)量,最大數(shù)量就是我們聲明的 CONCURRENCY 的值。

在 main 方法里面,我們聲明了 10000 個(gè) task,傳遞給 gather 方法運(yùn)行。倘若不加以限制,這 10000 個(gè) task 會(huì)被同時(shí)執(zhí)行,并發(fā)數(shù)量太大。但有了信號(hào)量的控制之后,同時(shí)運(yùn)行的 task 的數(shù)量最大會(huì)被控制在 5 個(gè),這樣就能給 aiohttp 限制速度了。

在這里,aiohttp 的基本使用就介紹這么多,更詳細(xì)的內(nèi)容還是推薦你到官方文檔查閱,鏈接:https://docs.aiohttp.org/。

爬取實(shí)戰(zhàn)

上面我們介紹了 aiohttp 的基本用法之后,下面我們來(lái)根據(jù)一個(gè)實(shí)例實(shí)現(xiàn)異步爬蟲(chóng)的實(shí)戰(zhàn)演練吧。

本次我們要爬取的網(wǎng)站是:https://dynamic5.scrape.cuiqingcai.com/,頁(yè)面如圖所示。

這是一個(gè)書(shū)籍網(wǎng)站,整個(gè)網(wǎng)站包含了數(shù)千本書(shū)籍信息,網(wǎng)站是 JavaScript 渲染的,數(shù)據(jù)可以通過(guò) Ajax 接口獲取到,并且接口沒(méi)有設(shè)置任何反爬措施和加密參數(shù),另外由于這個(gè)網(wǎng)站比之前的電影案例網(wǎng)站數(shù)據(jù)量大一些,所以更加適合做異步爬取。

本課時(shí)我們要完成的目標(biāo)有:

  • 使用 aiohttp 完成全站的書(shū)籍?dāng)?shù)據(jù)爬取。
  • 將數(shù)據(jù)通過(guò)異步的方式保存到 MongoDB 中。

在本課時(shí)開(kāi)始之前,請(qǐng)確保你已經(jīng)做好了如下準(zhǔn)備工作:

  • 安裝好了 Python(最低為 Python 3.6 版本,最好為 3.7 版本或以上),并能成功運(yùn)行 Python 程序。
  • 了解了 Ajax 爬取的一些基本原理和模擬方法。
  • 了解了異步爬蟲(chóng)的基本原理和 asyncio 庫(kù)的基本用法。
  • 了解了 aiohttp 庫(kù)的基本用法。
  • 安裝并成功運(yùn)行了 MongoDB 數(shù)據(jù)庫(kù),并安裝了異步存儲(chǔ)庫(kù) motor。

注:這里要實(shí)現(xiàn) MongoDB 異步存儲(chǔ),需要異步 MongoDB 存儲(chǔ)庫(kù),叫作 motor,安裝命令為:pip3 install motor

頁(yè)面分析

在之前我們講解了 Ajax 的基本分析方法,本課時(shí)的站點(diǎn)結(jié)構(gòu)和之前 Ajax 分析的站點(diǎn)結(jié)構(gòu)類(lèi)似,都是列表頁(yè)加詳情頁(yè)的結(jié)構(gòu),加載方式都是 Ajax,所以我們能輕松分析到如下信息:

  • 列表頁(yè)的 Ajax 請(qǐng)求接口格式為:https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset={offset},limit 的值即為每一頁(yè)的書(shū)的個(gè)數(shù),offset 的值為每一頁(yè)的偏移量,其計(jì)算公式為 offset = limit * (page - 1) ,如第 1 頁(yè) offset 的值為 0,第 2 頁(yè) offset 的值為 18,以此類(lèi)推。

  • 列表頁(yè) Ajax 接口返回的數(shù)據(jù)里 results 字段包含當(dāng)前頁(yè) 18 本書(shū)的信息,其中每本書(shū)的數(shù)據(jù)里面包含一個(gè)字段 id,這個(gè) id 就是書(shū)本身的 ID,可以用來(lái)進(jìn)一步請(qǐng)求詳情頁(yè)。

  • 詳情頁(yè)的 Ajax 請(qǐng)求接口格式為:https://dynamic5.scrape.cuiqingcai.com/api/book/{id},id 即為書(shū)的 ID,可以從列表頁(yè)的返回結(jié)果中獲取。

如果你掌握了 Ajax 爬取實(shí)戰(zhàn)一課時(shí)的內(nèi)容話(huà),上面的內(nèi)容應(yīng)該很容易分析出來(lái)。如有難度,可以復(fù)習(xí)下之前的知識(shí)。

實(shí)現(xiàn)思路

其實(shí)一個(gè)完善的異步爬蟲(chóng)應(yīng)該能夠充分利用資源進(jìn)行全速爬取,其思路是維護(hù)一個(gè)動(dòng)態(tài)變化的爬取隊(duì)列,每產(chǎn)生一個(gè)新的 task 就會(huì)將其放入隊(duì)列中,有專(zhuān)門(mén)的爬蟲(chóng)消費(fèi)者從隊(duì)列中獲取 task 并執(zhí)行,能做到在最大并發(fā)量的前提下充分利用等待時(shí)間進(jìn)行額外的爬取處理。

但上面的實(shí)現(xiàn)思路整體較為煩瑣,需要設(shè)計(jì)爬取隊(duì)列、回調(diào)函數(shù)、消費(fèi)者等機(jī)制,需要實(shí)現(xiàn)的功能較多。由于我們剛剛接觸 aiohttp 的基本用法,本課時(shí)也主要是了解 aiohttp 的實(shí)戰(zhàn)應(yīng)用,所以這里我們將爬取案例的實(shí)現(xiàn)稍微簡(jiǎn)化一下。

在這里我們將爬取的邏輯拆分成兩部分,第一部分為爬取列表頁(yè),第二部分為爬取詳情頁(yè)。由于異步爬蟲(chóng)的關(guān)鍵點(diǎn)在于并發(fā)執(zhí)行,所以我們可以將爬取拆分為兩個(gè)階段:

  • 第一階段為所有列表頁(yè)的異步爬取,我們可以將所有的列表頁(yè)的爬取任務(wù)集合起來(lái),聲明為 task 組成的列表,進(jìn)行異步爬取。
  • 第二階段則是拿到上一步列表頁(yè)的所有內(nèi)容并解析,拿到所有書(shū)的 id 信息,組合為所有詳情頁(yè)的爬取任務(wù)集合,聲明為 task 組成的列表,進(jìn)行異步爬取,同時(shí)爬取的結(jié)果也以異步的方式存儲(chǔ)到 MongoDB 里面。

因?yàn)閮蓚€(gè)階段的拆分之后需要串行執(zhí)行,所以可能不能達(dá)到協(xié)程的最佳調(diào)度方式和資源利用情況,但也差不了很多。但這個(gè)實(shí)現(xiàn)思路比較簡(jiǎn)單清晰,代碼實(shí)現(xiàn)也比較簡(jiǎn)單,能夠幫我們快速了解 aiohttp 的基本使用。

基本配置

首先我們先配置一些基本的變量并引入一些必需的庫(kù),代碼如下:

import asyncio import aiohttp import logging logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s: %(message)s') INDEX_URL = 'https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset={offset}' DETAIL_URL = 'https://dynamic5.scrape.cuiqingcai.com/api/book/{id}' PAGE_SIZE = 18 PAGE_NUMBER = 100 CONCURRENCY = 5

在這里我們導(dǎo)入了 asyncio、aiohttp、logging 這三個(gè)庫(kù),然后定義了 logging 的基本配置。接著定義了 URL、爬取頁(yè)碼數(shù)量 PAGE_NUMBER、并發(fā)量 CONCURRENCY 等信息。

爬取列表頁(yè)

首先,第一階段我們就來(lái)爬取列表頁(yè),還是和之前一樣,我們先定義一個(gè)通用的爬取方法,代碼如下:

semaphore = asyncio.Semaphore(CONCURRENCY) session = None async def scrape_api(url):async with semaphore:try:logging.info('scraping %s', url)async with session.get(url) as response:return await response.json()except aiohttp.ClientError:logging.error('error occurred while scraping %s', url, exc_info=True)

在這里我們聲明了一個(gè)信號(hào)量,用來(lái)控制最大并發(fā)數(shù)量。

接著我們定義了 scrape_api 方法,該方法接收一個(gè)參數(shù) url。首先使用 async with 引入信號(hào)量作為上下文,接著調(diào)用了 session 的 get 方法請(qǐng)求這個(gè) url,然后返回響應(yīng)的 JSON 格式的結(jié)果。另外這里還進(jìn)行了異常處理,捕獲了 ClientError,如果出現(xiàn)錯(cuò)誤,會(huì)輸出異常信息。

接著,對(duì)于列表頁(yè)的爬取,實(shí)現(xiàn)如下:

async def scrape_index(page):url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))return await scrape_api(url)

這里定義了一個(gè) scrape_index 方法用于爬取列表頁(yè),它接收一個(gè)參數(shù)為 page,然后構(gòu)造了列表頁(yè)的 URL,將其傳給 scrape_api 方法即可。這里注意方法同樣需要用 async 修飾,調(diào)用的 scrape_api 方法前面需要加 await,因?yàn)?scrape_api 調(diào)用之后本身會(huì)返回一個(gè) coroutine。另外由于 scrape_api 返回結(jié)果就是 JSON 格式,因此 scrape_index 的返回結(jié)果就是我們想要爬取的信息,不需要再額外解析了。

好,接著我們定義一個(gè) main 方法,將上面的方法串聯(lián)起來(lái)調(diào)用一下,實(shí)現(xiàn)如下:

import json async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]results = await asyncio.gather(*scrape_index_tasks)logging.info('results %s', json.dumps(results, ensure_ascii=False, indent=2))if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

這里我們首先聲明了 session 對(duì)象,即最初聲明的全局變量,將 session 作為全局變量的話(huà)我們就不需要每次在各個(gè)方法里面?zhèn)鬟f了,實(shí)現(xiàn)比較簡(jiǎn)單。

接著我們定義了 scrape_index_tasks,它就是爬取列表頁(yè)的所有 task,接著我們調(diào)用 asyncio 的 gather 方法并傳入 task 列表,將結(jié)果賦值為 results,它是所有 task 返回結(jié)果組成的列表。

最后我們調(diào)用 main 方法,使用事件循環(huán)啟動(dòng)該 main 方法對(duì)應(yīng)的協(xié)程即可。

運(yùn)行結(jié)果如下:

2020-04-03 03:45:54,692 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=0 2020-04-03 03:45:54,707 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=18 2020-04-03 03:45:54,707 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=36 2020-04-03 03:45:54,708 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=54 2020-04-03 03:45:54,708 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=72 2020-04-03 03:45:56,431 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=90 2020-04-03 03:45:56,435 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=108

可以看到這里就開(kāi)始異步爬取了,并發(fā)量是由我們控制的,目前為 5,當(dāng)然也可以進(jìn)一步調(diào)高并發(fā)量,在網(wǎng)站能承受的情況下,爬取速度會(huì)進(jìn)一步加快。

最后 results 就是所有列表頁(yè)得到的結(jié)果,我們將其賦值為 results 對(duì)象,接著我們就可以用它來(lái)進(jìn)行第二階段的爬取了。

爬取詳情頁(yè)

第二階段就是爬取詳情頁(yè)并保存數(shù)據(jù)了,由于每個(gè)詳情頁(yè)對(duì)應(yīng)一本書(shū),每本書(shū)需要一個(gè) ID,而這個(gè) ID 又正好存在 results 里面,所以下面我們就需要將所有詳情頁(yè)的 ID 獲取出來(lái)。

在 main 方法里增加 results 的解析代碼,實(shí)現(xiàn)如下:

ids = [] for index_data in results:if not index_data: continuefor item in index_data.get('results'):ids.append(item.get('id'))

這樣 ids 就是所有書(shū)的 id 了,然后我們用所有的 id 來(lái)構(gòu)造所有詳情頁(yè)對(duì)應(yīng)的 task,來(lái)進(jìn)行異步爬取即可。

那么這里再定義一個(gè)爬取詳情頁(yè)和保存數(shù)據(jù)的方法,實(shí)現(xiàn)如下:

from motor.motor_asyncio import AsyncIOMotorClient MONGO_CONNECTION_STRING = 'mongodb://localhost:27017' MONGO_DB_NAME = 'books' MONGO_COLLECTION_NAME = 'books' client = AsyncIOMotorClient(MONGO_CONNECTION_STRING) db = client[MONGO_DB_NAME] collection = db[MONGO_COLLECTION_NAME] async def save_data(data):logging.info('saving data %s', data)if data:return await collection.update_one({'id': data.get('id')}, {'$set': data}, upsert=True) async def scrape_detail(id):url = DETAIL_URL.format(id=id)data = await scrape_api(url)await save_data(data)

這里我們定義了 scrape_detail 方法用于爬取詳情頁(yè)數(shù)據(jù)并調(diào)用 save_data 方法保存數(shù)據(jù),save_data 方法用于將數(shù)據(jù)庫(kù)保存到 MongoDB 里面。

在這里我們用到了支持異步的 MongoDB 存儲(chǔ)庫(kù) motor,MongoDB 的連接聲明和 pymongo 是類(lèi)似的,保存數(shù)據(jù)的調(diào)用方法也是基本一致,不過(guò)整個(gè)都換成了異步方法。

好,接著我們就在 main 方法里面增加 scrape_detail 方法的調(diào)用即可,實(shí)現(xiàn)如下:

scrape_detail_tasks = [asyncio.ensure_future(scrape_detail(id)) for id in ids] await asyncio.wait(scrape_detail_tasks) await session.close()

在這里我們先聲明了 scrape_detail_tasks,即所有詳情頁(yè)的爬取 task 組成的列表,接著調(diào)用了 asyncio 的 wait 方法調(diào)用執(zhí)行即可,當(dāng)然這里也可以用 gather 方法,效果是一樣的,只不過(guò)返回結(jié)果略有差異。最后全部執(zhí)行完畢關(guān)閉 session 即可。

一些詳情頁(yè)的爬取過(guò)程運(yùn)行如下:

2020-04-03 04:00:32,576 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2301475 2020-04-03 04:00:32,576 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2351866 2020-04-03 04:00:32,577 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2828384 2020-04-03 04:00:32,577 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/3040352 2020-04-03 04:00:32,578 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/3074810 2020-04-03 04:00:44,858 - INFO: saving data {'id': '3040352', 'comments': [{'id': '387952888', 'content': '溫馨文,青梅竹馬神馬的很有愛(ài)~'}, ..., {'id': '2005314253', 'content': '沈晉&秦央,文比較短,平平淡淡,貼近生活,短文的缺點(diǎn)不細(xì)膩'}], 'name': '那些風(fēng)花雪月', 'authors': ['\n 公子歡喜'], 'translators': [], 'publisher': '龍馬出版社', 'tags': ['公子歡喜', '耽美', 'BL', '小說(shuō)', '現(xiàn)代', '校園', '耽美小說(shuō)', '那些風(fēng)花雪月'], 'url': 'https://book.douban.com/subject/3040352/', 'isbn': '9789866685156', 'cover': 'https://img9.doubanio.com/view/subject/l/public/s3029724.jpg', 'page_number': None, 'price': None, 'score': '8.1', 'introduction': '', 'catalog': None, 'published_at': '2008-03-26T16:00:00Z', 'updated_at': '2020-03-21T16:59:39.584722Z'} 2020-04-03 04:00:44,859 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2994915 ...

最后我們觀(guān)察下,爬取到的數(shù)據(jù)也都保存到 MongoDB 數(shù)據(jù)庫(kù)里面了,如圖所示:

總結(jié)

本課時(shí)的內(nèi)容較多,我們了解了 aiohttp 的基本用法,然后通過(guò)一個(gè)實(shí)例講解了 aiohttp 異步爬蟲(chóng)的具體實(shí)現(xiàn)。學(xué)習(xí)過(guò)程我們可以發(fā)現(xiàn),相比普通的單線(xiàn)程爬蟲(chóng)來(lái)說(shuō),使用異步可以大大提高爬取效率,后面我們也可以多多使用。

本課時(shí)代碼:https://github.com/Germey/ScrapeDynamic5。

總結(jié)

以上是生活随笔為你收集整理的第17讲:aiohttp 异步爬虫实战的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。