Scrayp-集成scrapy_redis和bloomfilter实现增量
前言
(備注一下,我的開發環境不是Linux就是MacOSX,Windows很多寫法不是這樣的)
在爬取數據的過程中,有時候需要用到定時、增量爬取。定時這里暫且不說,先說增量爬取。
- 我想要的增量爬取目前只是簡單的,根據url請求來判斷是否爬過,如果爬過則不再爬。
- 復雜一些的增量則是重復爬取,根據指定的幾個字段判斷是否值有變化,值有變化也算作增量,應當爬取且只更新變化部分(比如天貓商品數據,商品的價格有變化則更新價格,但是url是重復的,也應當爬取)
網上增量爬取的文章很多,包括看過慕課網Scrapy課的筆記,但是它還是不完善,我將在這個基礎上進行實際集成。
布隆簡介
Bloom Filter是一種空間效率很高的隨機數據結構,它利用位數組很簡潔地表示一個集合,并能判斷一個元素是否屬于這個集合。Bloom Filter的這種高效是有一定代價的:在判斷一個元素是否屬于某個集合時,有可能會把不屬于這個集合的元素誤認為屬于這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了存儲空間的極大節省。
輸入圖片說明具體的bloomfilter概念和原理應該查看這篇文章:傳送,還有《海量數據處理算法》以及《大規模數據處理利器》
布隆優點
相比于其它的數據結構,布隆過濾器在空間和時間方面都有巨大的優勢。布隆過濾器存儲空間和插入/查詢時間都是常數。另外, Hash 函數相互之間沒有關系,方便由硬件并行實現。布隆過濾器不需要存儲元素本身,在某些對保密要求非常嚴格的場合有優勢。
布隆過濾器可以表示全集,其它任何數據結構都不能;
k 和 m 相同,使用同一組 Hash 函數的兩個布隆過濾器的交并差運算可以使用位操作進行。
布隆缺點
但是布隆過濾器的缺點和優點一樣明顯。誤算率(False Positive)是其中之一。隨著存入的元素數量增加,誤算率隨之增加。但是如果元素數量太少,則使用散列表足矣。
另外,一般情況下不能從布隆過濾器中刪除元素. 我們很容易想到把位列陣變成整數數組,每插入一個元素相應的計數器加1, 這樣刪除元素時將計數器減掉就可以了。然而要保證安全的刪除元素并非如此簡單。首先我們必須保證刪除的元素的確在布隆過濾器里面. 這一點單憑這個過濾器是無法保證的。另外計數器回繞也會造成問題。
總的來說,布隆很適合來處理海量的數據,而且速度優勢很強。
redis與bloom
去重”是日常工作中會經常用到的一項技能,在爬蟲領域更是常用,并且規模一般都比較大。參考文章《基于Redis的Bloomfilter去重》,作者【九茶】還有另一篇文章可以參考《scrapy_redis去重優化,已有7億條數據》
去重需要考慮兩個點:去重的數據量、去重速度。
為了保持較快的去重速度,一般選擇在內存中進行去重。
- 數據量不大時,可以直接放在內存里面進行去重,例如python可以使用set()進行去重。
- 當去重數據需要持久化時可以使用redis的set數據結構。
- 當數據量再大一點時,可以用不同的加密算法先將長字符串壓縮成 16/32/40 個字符,再使用上面兩種方法去重;
- 當數據量達到億(甚至十億、百億)數量級時,內存有限,必須用“位”來去重,才能夠滿足需求。Bloomfilter就是將去重對象映射到幾個內存“位”,通過幾個位的 0/1值來判斷一個對象是否已經存在。
- 然而Bloomfilter運行在一臺機器的內存上,不方便持久化(機器down掉就什么都沒啦),也不方便分布式爬蟲的統一去重。如果可以在Redis上申請內存進行Bloomfilter,以上兩個問題就都能解決了。
Bloomfilter算法如何使用位去重,這個百度上有很多解釋。簡單點說就是有幾個seeds,現在申請一段內存空間,一個seed可以和字符串哈希映射到這段內存上的一個位,幾個位都為1即表示該字符串已經存在。插入的時候也是,將映射出的幾個位都置為1。
需要提醒一下的是Bloomfilter算法會有漏失概率,即不存在的字符串有一定概率被誤判為已經存在。這個概率的大小與seeds的數量、申請的內存大小、去重對象的數量有關。下面有一張表,m表示內存大小(多少個位),n表示去重對象的數量,k表示seed的個數。例如我代碼中申請了256M,即1<<31(m=2^31,約21.5億),seed設置了7個。看k=7那一列,當漏失率為8.56e-05時,m/n值為23。所以n = 21.5/23 = 0.93(億),表示漏失概率為8.56e-05時,256M內存可滿足0.93億條字符串的去重。同理當漏失率為0.000112時,256M內存可滿足0.98億條字符串的去重。
基于Redis的Bloomfilter去重,其實就是利用了Redis的String數據結構,但Redis一個String最大只能512M,所以如果去重的數據量大,需要申請多個去重塊(代碼中blockNum即表示去重塊的數量)。
代碼中使用了MD5加密壓縮,將字符串壓縮到了32個字符(也可用hashlib.sha1()壓縮成40個字符)。它有兩個作用,一是Bloomfilter對一個很長的字符串哈希映射的時候會出錯,經常誤判為已存在,壓縮后就不再有這個問題;二是壓縮后的字符為 0~f 共16中可能,我截取了前兩個字符,再根據blockNum將字符串指定到不同的去重塊進行去重
總結:基于Redis的Bloomfilter去重,既用上了Bloomfilter的海量去重能力,又用上了Redis的可持久化能力,基于Redis也方便分布式機器的去重。在使用的過程中,要預算好待去重的數據量,則根據上面的表,適當地調整seed的數量和blockNum數量(seed越少肯定去重速度越快,但漏失率越大)。
編寫代碼
安裝依賴
根據github上的資源《BloomFilter_imooc》以及思路來編寫bloomfilter的代碼。
先前說過,bloom是一種算法,而不是插件也不是軟件,它依賴于mmh3,所以需要在虛擬環境中安裝mmh3.
然而當我在本機的anaconda虛擬環境內安裝時,出現了報錯:
g++: error trying to exec 'cc1plus': execvp: 沒有那個文件或目錄
網上查閱了很多文章,找到一個適合我的:傳送,大致原因是電腦上的gcc版本與g++版本不一致引起的。可以打開終端用命令:
gcc -vg++ -v來查看兩個東西的版本,最終發現用g++的時候報錯,于是我安裝它:
sudo apt-get install g++如果是在阿里云服務器,命令改成:
yum install gcc-c++安裝成功后,再次到anaconda虛擬環境中安裝mmh3,才成功安裝。
編寫bloom代碼
根據文章《將bloomfilter(布隆過濾器)集成到scrapy-redis中》的指引,作者是將github代碼下載到本地目錄。
而我為了省事,我在site-package里面寫。
在site-package下新建bloofilter_scrapy_redis的package包(帶init那種),然后在里面新建文件bloomfilter.py,編寫代碼:
這里的pool和conn都是單獨連接的,實際上在分布式爬蟲中是比較不友好的,多臺機器的配置就會煩人,這里暫且這樣,后期我再改。
是否配置密碼
至于是否配置密碼,如何配置密碼,在bloomfilter.py文件中,有一句:
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0) conn = redis.StrictRedis(connection_pool=pool)其中redis.StrictRedis方法,跟蹤(ctrl+左鍵點擊)進去,可以看到init初始化方法里面有個password=None
def __init__(self, host='localhost', port=6379,db=0, password=None, socket_timeout=None,socket_connect_timeout=None,socket_keepalive=None, socket_keepalive_options=None,connection_pool=None, unix_socket_path=None,encoding='utf-8', encoding_errors='strict',charset=None, errors=None,decode_responses=False, retry_on_timeout=False,ssl=False, ssl_keyfile=None, ssl_certfile=None,ssl_cert_reqs=None, ssl_ca_certs=None,max_connections=None):這里應該是設置password,也就是將服務器redis的權限密碼auth設置進來。
pool = redis.ConnectionPool(host='47.98.110.67', port=6379, db=0, password='quinns') conn = redis.StrictRedis(connection_pool=pool)即可完成密碼的設置。
集成到scrapy_redis中
上面的布隆過濾器代碼寫好后,需要集成到scrapy_redis中。完成去重任務的是dupefilter.py文件,就要對它進行改造,路徑是site-package/scrapy_redis/目錄內:
現將剛才編寫的布隆選擇器導入此文件
from bloomfilter_scrapy_redis.bloomfilter import conn,PyBloomFilter # 從源碼包導入布隆然后在init方法中初始化布隆選擇器(這里貼上整個init代碼):
def __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True""" 集成布隆過濾器,通過連接池連接redis """self.bf = PyBloomFilter(conn=conn, key=key)接下來改動request_seen方法,在里面對request進行判斷,如果此次request請求在redis中存在,則直接返回,如果不存在則添加到redis的隊列里面去,讓爬蟲去爬:
def request_seen(self, request):"""……"""fp = self.request_fingerprint(request)"""集成布隆過濾判斷redis是否存在此指紋,如果存在則直接返回true如果不存在添加指紋到redis,同時返回false"""if self.bf.is_exist(fp):return Trueelse:self.bf.add(fp)return False""" 集成布隆過濾器,將下方2行代碼注釋 """# This returns the number of values added, zero if already exists.# added = self.server.sadd(self.key, fp)# return added == 0到這里即完成了scrapy_redis對布隆過濾器的集成。
測試
在爬蟲代碼中編寫:
# -*- coding: utf-8 -*- import scrapy from scrapy_redis.spiders import RedisSpider from scrapy.http import Request from urllib import parseclass JobboleSpider(RedisSpider):name = 'jobbole'allowd_domains = ["www.gxnhyd.com"]redis_key = 'jobbole:start_urls'def parse(self, response):"""將當前列表頁的每條標的鏈接拿到 并傳給detail進行深入爬取通過已知列表頁碼數量 進行循環爬取 就不用翻頁了"""total = response.css('.item .tl.pl10 a')for x in total:title = x.css('::text').extract_first("")title_url = x.css('::attr(href)').extract_first("")yield Request(url=parse.urljoin(response.url, title_url), callback=self.parse_detail)for i in range(1, 10):next_pages = "http://www.gxnhyd.com/deals/p-%s" % (i)yield Request(url=next_pages, callback=self.parse)def parse_detail(self, response):"""獲取當前詳情頁的標的信息 包括金額 收益 期限 借款人投資人列表 - 投資人用戶名/投資人投資金額/投資方式/投資時間等:param response::return:"""print(response.url)通過print對爬取情況做觀察
開啟爬蟲后,由于scrapy_redis的特性,需要給redis里面添加start_urls:
lpush jobbole:start_urls http://www.gxnhyd.com/deals [value ...]爬蟲監聽到值之后,立即開始爬取,這一步沒問題
但是爬完后它空跑了,不會結束,一直空跑。(事實證明,跑空了也不要緊)
二次測試
在第一次測試通過后,我加大了循環次數for i in range(1, 30),看看是否會出現重復的值,結果報錯了。
報錯信息與bloom是否重復無關,原因是我之前看到空跑,就主動停止了代碼,導致redis報錯:
MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist
解決辦法在這里《redis異常解決:MISCONF Redis 》,在redis-cli用命令解決這個權限問題:
config set stop-writes-on-bgsave-error no二次測試后,發現可以正常運行了。然后觀察到bloom也生效了,但是還是有空跑的問題
解決空跑(這個辦法其實不太好,不推薦)
空跑就是爬蟲在爬取完所有的隊列有,不會自動停止,而是一直請求請求,然后觀察redis-server窗口有memory的提示一直在進行。
解決這個空跑問題參考了一些資料《scrapy-redis所有request爬取完畢,如何解決爬蟲空跑問題? 》
輸入圖片說明根據scrapy-redis分布式爬蟲的原理,多臺爬蟲主機共享一個爬取隊列。當爬取隊列中存在request時,爬蟲就會取出request進行爬取,如果爬取隊列中不存在request時,爬蟲就會處于等待狀態.
可是,如果所有的request都已經爬取完畢了呢?這件事爬蟲程序是不知道的,它無法區分結束和空窗期狀態的不同,所以會一直處于上面的那種等待狀態,也就是我們說的空跑。
那有沒有辦法讓爬蟲區分這種情況,自動結束呢?
- 從背景介紹來看,基于scrapy-redis分布式爬蟲的原理,爬蟲結束是一個很模糊的概念,在爬蟲爬取過程中,爬取隊列是一個不斷動態變化的過程,隨著request的爬取,又會有新的request進入爬取隊列。進進出出。
- 爬取速度高于填充速度,就會有隊列空窗期(爬取隊列中,某一段時間會出現沒有request的情況),爬取速度低于填充速度,就不會出現空窗期。所以對于爬蟲結束這件事來說,只能模糊定義,沒有一個精確的標準。
可以通過限定爬蟲自動關閉時間來完成這個任務,在settings配置:
# 爬蟲運行超過23.5小時,如果爬蟲還沒有結束,則自動關閉 CLOSESPIDER_TIMEOUT = 84600特別注意 :如果爬蟲在規定時限沒有把request全部爬取完畢,此時強行停止的話,爬取隊列中就還會存有部分request請求。那么爬蟲下次開始爬取時,一定要記得在master端對爬取隊列進行清空操作。
想象一下,爬蟲已經結束的特征是什么?
那就是爬取隊列已空,從爬取隊列中無法取到request信息。那著手點應該就在從爬取隊列中獲取request和調度這個部分。查看scrapy-redis源碼,我們發現了兩個著手點,調度器site-packages\scrapy_redis\schedluer.py和site-packages\scrapy_redis\spiders.py爬蟲。
但是爬蟲在爬取過程中,隊列隨時都可能出現暫時的空窗期。想判斷爬取隊列為空,一般是設定一個時限,如果在一個時段內,隊列一直持續為空,那我們可以基本認定這個爬蟲已經結束了。
我選擇更改調度器,site-packages\scrapy_redis\schedluer.py所以有了如下的改動:
首先在init里面設定一個初始次數
import datetimedef __init__(self, server,…………"""""" 為解決空跑問題:設定倒計次數 下方根據次數決定何時關閉爬蟲,避免空跑"""self.lostGetRequest = 0if idle_before_close < 0:…………完整的init方法代碼為:
def __init__(self, server,persist=False,flush_on_start=False,queue_key=defaults.SCHEDULER_QUEUE_KEY,queue_cls=defaults.SCHEDULER_QUEUE_CLASS,dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,idle_before_close=0,serializer=None):""" 為解決空跑問題:設定倒計次數 下方根據次數決定何時關閉爬蟲,避免空跑"""self.lostGetRequest = 0if idle_before_close < 0:raise TypeError("idle_before_close cannot be negative")self.server = serverself.persist = persistself.flush_on_start = flush_on_startself.queue_key = queue_keyself.queue_cls = queue_clsself.dupefilter_cls = dupefilter_clsself.dupefilter_key = dupefilter_keyself.idle_before_close = idle_before_closeself.serializer = serializerself.stats = None然后到next_request方法中進行修改:
def next_request(self):block_pop_timeout = self.idle_before_closerequest = self.queue.pop(block_pop_timeout)if request and self.stats:""" 解決空跑問題,這里判斷如果獲取到request則重置倒計時lostGetRequest """self.lostGetRequest = 0self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)if request is None:""" scrapy_reids跑完數據后不會自動停止,會產生空跑情況,一直空跑 每次調度Schedule時如果隊列沒有數據 則倒計時+150次空跑大約費時5分鐘,根據項目需求設定次數,滿足空跑次數則主動停止并填寫停止原因"""self.lostGetRequest += 1if self.lostGetRequest > 10:self.spider.crawler.engine.close_spider(self.spider, 'Queue is empty,So active end')return request這樣就可以解決空跑的問題了。(事實證明,高興得太早)
真正解決空跑(這個也不好,不建議。因為scrapy_redis已處理空跑問題(我也不確定))
真是太年輕,不懂事,我以為按照別人的想法實施,就可以解決空跑的問題了。然后當自己親自測試的時候,發現并不是那么回事。
scrapy是異步的,而且request隊列確實會有空閑狀態,如果有空閑狀態就會+1,用數字進行累加的話,雖然上編寫了重置為0的操作,但貌似是不行的,測試沒有那么細致,反正當空閑狀態達到N次(關閉條件)的時候,就會自動關閉(request隊列還在抽取,也會被關閉),那這就是個bug。
首先
思路是對的,然而用+1的方式出錯了。我換了個思路,用時間差來決定是否關閉爬蟲。邏輯:
優點
具體的代碼如下:
現在init方法設定起始時間
為解決空跑問題:設定起始時間 下方根據記錄空跑時間end_times與起始時間的時間差來決定何時關閉爬蟲,避免空跑"""self.strat_times = datetime.datetime.now()然后到next_request方法進行具體的時間差計算和空跑判斷,還有爬蟲的關閉操作:
def next_request(self):block_pop_timeout = self.idle_before_closerequest = self.queue.pop(block_pop_timeout)if request and self.stats:""" 解決空跑問題,這里判斷如果獲取到request則重置起始時間strat_times """self.strat_times = datetime.datetime.now()self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)if request is None:""" scrapy_reids跑完數據后不會自動停止,會產生空跑情況,一直空跑 每次調度Schedule時如果隊列沒有數據 則計算end_times當end_times與start_times的時間差close_times超過N秒,就判定為空跑且進行關閉爬蟲的操作"""self.end_times = datetime.datetime.now()self.close_times = (self.end_times - self.strat_times).secondsprint("tihs close_times is : ")print(self.close_times)if self.close_times > 180:self.spider.crawler.engine.close_spider(self.spider, 'Queue is empty,So active end')return request看到下圖,跑完數據后會根據時間差關閉爬蟲
輸入圖片說明這樣才是真正的解決了空跑的問題
最后運行,可以正常關閉爬蟲了。但是結束的時候還會有報錯信息:
builtins.AttributeError: 'NoneType' object has no attribute 'start_requests'2017-12-14 16:18:56 [twisted] CRITICAL: Unhandled Error Traceback (most recent call last):File "E:\Miniconda\lib\site-packages\scrapy\commands\runspider.py", line 89, in runself.crawler_process.start()File "E:\Miniconda\lib\site-packages\scrapy\crawler.py", line 285, in startreactor.run(installSignalHandlers=False) # blocking callFile "E:\Miniconda\lib\site-packages\twisted\internet\base.py", line 1243, in runself.mainLoop()File "E:\Miniconda\lib\site-packages\twisted\internet\base.py", line 1252, in mainLoopself.runUntilCurrent() --- <exception caught here> ---File "E:\Miniconda\lib\site-packages\twisted\internet\base.py", line 878, in runUntilCurrentcall.func(*call.args, **call.kw)File "E:\Miniconda\lib\site-packages\scrapy\utils\reactor.py", line 41, in __call__return self._func(*self._a, **self._kw)File "E:\Miniconda\lib\site-packages\scrapy\core\engine.py", line 137, in _next_requestif self.spider_is_idle(spider) and slot.close_if_idle:File "E:\Miniconda\lib\site-packages\scrapy\core\engine.py", line 189, in spider_is_idleif self.slot.start_requests is not None: builtins.AttributeError: 'NoneType' object has no attribute 'start_requests'當通過engine.close_spider(spider, ‘reason’)來關閉spider時,有時會出現幾個錯誤之后才能關閉。可能是因為scrapy會開啟多個線程同時抓取,然后其中一個線程關閉了spider,其他線程就找不到spider才會報錯。
注意事項
編寫代碼的schedule.py有個next_request方法有這么一句代碼:
request = self.queue.pop(block_pop_timeout)打開同目錄的queue.py文件
輸入圖片說明所以,PriorityQueue和另外兩種隊列FifoQueue,LifoQueue有所不同,特別需要注意。
如果會使用到timeout這個參數,那么在setting中就只能指定爬取隊列為FifoQueue或LifoQueue
# 指定排序爬取地址時使用的隊列, # 默認的 按優先級排序(Scrapy默認),由sorted set實現的一種非FIFO、LIFO方式。 # 'SCHEDULER_QUEUE_CLASS': 'scrapy_redis.queue.SpiderPriorityQueue', # 可選的 按先進先出排序(FIFO) 'SCHEDULER_QUEUE_CLASS': 'scrapy_redis.queue.SpiderQueue', # 可選的 按后進先出排序(LIFO) # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack'數據入庫測試
經過多次 的mysql入庫測試,發現bloomfilter是生效的,而且增量開始之前,對于那么重復的數據對比過濾是非常快的(僅用了500條數據測試),正常爬取500條數據大約1分鐘多一點。在爬取過500多數據后,bloomfilter的略過只用了幾秒鐘,很短的時間。
這個還是很強的,我很高興
總結
以上是生活随笔為你收集整理的Scrayp-集成scrapy_redis和bloomfilter实现增量的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 王者荣耀盾山铭文出装 2017年赵寅成
- 下一篇: instagram如何国内使用(@ins