Scrapy-redis 源码分析 及 框架使用
?
From:https://blog.csdn.net/weixin_37947156/article/details/75044971
From:https://cuiqingcai.com/6058.html
Scrapy-redis github:https://github.com/rmax/scrapy-redis
scrapy-redis分布式爬蟲框架詳解:https://segmentfault.com/a/1190000014333162?utm_source=channel-hottest
集群版 Scrapy-Redis:https://github.com/thsheep/scrapy_redis_cluster
scrapy-redis 和 scrapy 有什么區別?:https://www.zhihu.com/question/32302268
scrapy-redis使用以及剖析:https://www.cnblogs.com/wangyongsong/p/7485852.html
scrapy-redis 解析:https://www.cnblogs.com/zy0517/articles/9109681.html
基于 Scrapy-redis 的分布式爬蟲設計:https://www.jianshu.com/p/cd4054bbc757/
小白進階之Scrapy第三篇(基于Scrapy-Redis的分布式以及cookies池):https://cuiqingcai.com/4048.html
Scrapy+redis實現分布式爬蟲簡易教程:https://www.jianshu.com/p/ed5afa658ccb?from=jiantop.com
?
scrapy 是 python 的一個非常好用的爬蟲庫,功能非常強大,如果是小站的話,我們使用 scrapy 本身就可以滿足。但是當我們要爬取的頁面非常多的時候,面對一些比較大型的站點的時候,單個 scrapy 就顯得力不從心了。單個主機的處理能力就不能滿足我們的需求了(無論是處理速度還是網絡請求的并發數)。
這時候分布式爬蟲的優勢就顯現出來,人多力量大。很遺憾 Scrapy 官方并不支持多個同時采集一個站點,雖然官方給出一個方法:**將一個站點的分割成幾部分 交給不同的scrapy去采集**。似乎是個解決辦法,但是很麻煩誒!畢竟分割很麻煩的哇
下面就該 Scrapy-Redis 登場了。scrapy-redis 就是結合了分布式數據庫 redis,重寫了 scrapy 一些比較關鍵的代碼,將 scrapy 變成一個可以在多個主機上同時運行的分布式爬蟲。?
scrapy-redis 是 github 上的一個開源項目,可以直接下載到他的源代碼:?https://github.com/rmax/scrapy-redis
scrapy-redis 的官方文檔寫的比較簡潔,沒有提及其運行原理,所以如果想全面的理解分布式爬蟲的運行原理,還是得看 scrapy的源代碼才行(還得先理解 scrapy 的運行原理,不然看 scrapy-redis 還是比較費勁)。
?
?
?
來看一看 Scrapy 的架構圖
?
這張圖大家相信大家都很熟悉了。重點看一下SCHEDULER
?
1. 先來看看官方對于SCHEDULER的定義:
?
**SCHEDULER接受來自Engine的Requests,并將它們放入隊列(可以按順序優先級),以便在之后將其提供給Engine**
官方文檔:https://doc.scrapy.org/en/latest/topics/architecture.html#component-scheduler
?
2. 現在我們來看看SCHEDULER都提供了些什么功能:
?
根據官方文檔說明 在我們沒有沒有指定 SCHEDULER 參數時,默認使用:'scrapy.core.scheduler.Scheduler'?作為SCHEDULER(調度器)
scrapy.core.scheduler.py:
class Scheduler(object):def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,logunser=False, stats=None, pqclass=None):self.df = dupefilterself.dqdir = self._dqdir(jobdir)self.pqclass = pqclassself.dqclass = dqclassself.mqclass = mqclassself.logunser = logunserself.stats = stats@classmethoddef from_crawler(cls, crawler):'''注意在 scrapy 中優先注意這個方法,此方法是一個鉤子 用于訪問當前爬蟲的配置'''settings = crawler.settings# 獲取去重用的類 默認:scrapy.dupefilters.RFPDupeFilterdupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])# 對去重類進行配置from_settings 在 scrapy.dupefilters.RFPDupeFilter 43行# 這種調用方式對于IDE跳轉不是很好 所以需要自己去找# @classmethod# def from_settings(cls, settings):# debug = settings.getbool('DUPEFILTER_DEBUG')# return cls(job_dir(settings), debug)# 上面就是from_settings方法 其實就是設置工作目錄 和是否開啟debugdupefilter = dupefilter_cls.from_settings(settings)# 獲取優先級隊列 類對象 默認:queuelib.pqueue.PriorityQueuepqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])# 獲取磁盤隊列 類對象(SCHEDULER使用磁盤存儲 重啟不會丟失)dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])# 獲取內存隊列 類對象(SCHEDULER使用內存存儲 重啟會丟失)mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])# 是否開啟debuglogunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG'))# 將這些參數傳遞給 __init__方法return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)def has_pending_requests(self):"""檢查是否有沒處理的請求"""return len(self) > 0def open(self, spider):"""Engine創建完畢之后會調用這個方法"""self.spider = spider# 創建一個有優先級的內存隊列 實例化對象# self.pqclass 默認是:queuelib.pqueue.PriorityQueue# self._newmq 會返回一個內存隊列的 實例化對象 在110 111 行self.mqs = self.pqclass(self._newmq)# 如果self.dqdir 有設置 就創建一個磁盤隊列 否則self.dqs 為空self.dqs = self._dq() if self.dqdir else None# 獲得一個去重實例對象 open 方法是從BaseDupeFilter繼承的# 現在我們可以用self.df來去重啦return self.df.open()def close(self, reason):"""當然Engine關閉時"""# 如果有磁盤隊列 則對其進行dump后保存到active.json文件中if self.dqs:prios = self.dqs.close()with open(join(self.dqdir, 'active.json'), 'w') as f:json.dump(prios, f)# 然后關閉去重return self.df.close(reason)def enqueue_request(self, request):"""添加一個Requests進調度隊列"""# self.df.request_seen是檢查這個Request是否已經請求過了 如果有會返回Trueif not request.dont_filter and self.df.request_seen(request):# 如果Request的dont_filter屬性沒有設置(默認為False)和 已經存在則去重# 不push進隊列self.df.log(request, self.spider)return False# 先嘗試將Request push進磁盤隊列dqok = self._dqpush(request)if dqok:# 如果成功 則在記錄一次狀態self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)else:# 不能添加進磁盤隊列則會添加進內存隊列self._mqpush(request)self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)self.stats.inc_value('scheduler/enqueued', spider=self.spider)return Truedef next_request(self):"""從隊列中獲取一個Request"""# 優先從內存隊列中獲取request = self.mqs.pop()if request:self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)else:# 不能獲取的時候從磁盤隊列隊里獲取request = self._dqpop()if request:self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)if request:self.stats.inc_value('scheduler/dequeued', spider=self.spider)# 將獲取的到Request返回給Enginereturn requestdef __len__(self):return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)def _dqpush(self, request):if self.dqs is None:returntry:reqd = request_to_dict(request, self.spider)self.dqs.push(reqd, -request.priority)except ValueError as e: # non serializable requestif self.logunser:msg = ("Unable to serialize request: %(request)s - reason:"" %(reason)s - no more unserializable requests will be"" logged (stats being collected)")logger.warning(msg, {'request': request, 'reason': e},exc_info=True, extra={'spider': self.spider})self.logunser = Falseself.stats.inc_value('scheduler/unserializable',spider=self.spider)returnelse:return Truedef _mqpush(self, request):self.mqs.push(request, -request.priority)def _dqpop(self):if self.dqs:d = self.dqs.pop()if d:return request_from_dict(d, self.spider)def _newmq(self, priority):return self.mqclass()def _newdq(self, priority):return self.dqclass(join(self.dqdir, 'p%s' % priority))def _dq(self):activef = join(self.dqdir, 'active.json')if exists(activef):with open(activef) as f:prios = json.load(f)else:prios = ()q = self.pqclass(self._newdq, startprios=prios)if q:logger.info("Resuming crawl (%(queuesize)d requests scheduled)",{'queuesize': len(q)}, extra={'spider': self.spider})return qdef _dqdir(self, jobdir):if jobdir:dqdir = join(jobdir, 'requests.queue')if not exists(dqdir):os.makedirs(dqdir)return dqdir從上面的代碼可以很清楚的知道 SCHEDULER 主要是完成了 push Request、pop Request 和 去重 的操作。而且 queue 操作是在內存隊列中完成的。大家看 queuelib.queue 就會發現是基于內存的(deque)。
那么去重呢?
class RFPDupeFilter(BaseDupeFilter):"""Request Fingerprint duplicates filter"""def __init__(self, path=None, debug=False):self.file = Noneself.fingerprints = set()self.logdupes = Trueself.debug = debugself.logger = logging.getLogger(__name__)if path:# 此處可以看到去重其實打開了一個名叫 requests.seen的文件# 如果是使用的磁盤的話self.file = open(os.path.join(path, 'requests.seen'), 'a+')self.file.seek(0)self.fingerprints.update(x.rstrip() for x in self.file)@classmethoddef from_settings(cls, settings):debug = settings.getbool('DUPEFILTER_DEBUG')return cls(job_dir(settings), debug)def request_seen(self, request):fp = self.request_fingerprint(request)if fp in self.fingerprints:# 判斷我們的請求是否在這個在集合中return True# 沒有在集合就添加進去self.fingerprints.add(fp)# 如果用的磁盤隊列就寫進去記錄一下if self.file:self.file.write(fp + os.linesep)按照正常流程就是大家都會進行重復的采集;我們都知道進程之間內存中的數據不可共享的,那么你在開啟多個Scrapy的時候,它們相互之間并不知道對方采集了些什么那些沒有沒采集。那就大家伙兒自己玩自己的了。完全沒沒有效率的提升啊!
怎么解決呢?
這就是我們 Scrapy-Redis 解決的問題了,不能協作不就是因為 Request 和 去重 這兩個不能共享嗎?
那我把這兩個獨立出來好了。
將 Scrapy 中的 SCHEDULER 組件獨立放到大家都能訪問的地方不就OK啦!加上 scrapy-redis 后流程圖就應該變成這樣了?
scrapy-redis 在 scrapy 的架構上增加了 redis,基于 redis 的特性拓展了如下四種組件:Scheduler,Duplication Filter,Item Pipeline,Base Spider
?
?
scrapy-redis 源碼分析
?
scrapy-redis 的源代碼很少,也比較好懂,很快就能看完。
下面開始 scrapy-redis 源碼分析:
scrapy-redis 工程的主體還是 redis 和 scrapy 兩個庫,工程本身實現的東西不是很多,這個工程就像膠水一樣,把這兩個插件粘結了起來。下面我們來看看,scrapy-redis的每一個源代碼文件都實現了什么功能,最后如何實現分布式的爬蟲系統:
?
defaults.py
?
redis 的一些基礎的默認的設置。其實就是一些默認配置:
import redis# For standalone use. DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'PIPELINE_KEY = '%(spider)s:items'REDIS_CLS = redis.StrictRedis REDIS_ENCODING = 'utf-8' # Sane connection defaults. REDIS_PARAMS = {'socket_timeout': 30,'socket_connect_timeout': 30,'retry_on_timeout': True,'encoding': REDIS_ENCODING, }SCHEDULER_QUEUE_KEY = '%(spider)s:requests' SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'START_URLS_KEY = '%(name)s:start_urls' START_URLS_AS_SET = False?
connect.py
?
connect 文件引入了redis 模塊,這個是 redis-python庫的接口,用于通過python訪問redis數據庫,可見,這個文件主要是實現連接redis數據庫的功能(返回的是redis庫的Redis對象或者StrictRedis對象,這倆都是可以直接用來進行數據操作的對象)。這些連接接口在其他文件中經常被用到。其中,我們可以看到,要想連接到redis數據庫,和其他數據庫差不多,需要一個ip地址、端口號、用戶名密碼(可選)和一個整形的數據庫編號,同時我們還可以在scrapy工程的setting文件中配置套接字的超時時間、等待時間等。
其實這個模塊的功能:
- 1. 從 settings 里面獲取 redis 的鏈接配置
- 2. 獲取 redis 的 鏈接 實例
?
dupefilters.py
?
這個主要是用來去重的。RFPDupeFilter繼承自 Scrapy 的BaseDupeFilter,實現了 request 去重功能,基于 Scrapy 的 request_fingerprint 生成指紋,并在 Redis 上存儲。當收到新的 request,首先生成指紋判斷是否存在于已爬取的指紋庫內(Redis set),若存在則返回 False,不存在返回 True.總得來說是這樣的,這個文件首先獲取到redis的server,然后從scrapy的request中獲取request的指紋,將這個指紋進行存到redis的去重庫中。達到去重的目的。
這個文件看起來比較復雜,重寫了scrapy本身已經實現的 request 判重功能。因為本身 scrapy 單機跑的話,只需要讀取內存中的request 隊列 或者 持久化的 request 隊列(scrapy默認的持久化似乎是json格式的文件,不是數據庫)就能判斷這次要發出的request url是否已經請求過或者正在調度(本地讀就行了)。而?分布式跑的話,就需要各個主機上的scheduler都連接同一個數據庫的同一個 request池 來判斷這次的請求是否是重復的了。?
在這個文件中,通過繼承 BaseDupeFilter 重寫他的方法,實現了基于redis的判重。根據源代碼來看,scrapy-redis 使用了scrapy本身的一個 fingerprint 接口?request_fingerprint,這個接口很有趣,根據scrapy文檔所說,他通過hash來判斷兩個url是否相同(相同的url會生成相同的hash結果),但是當兩個url的地址相同,get型參數相同但是順序不同時,也會生成相同的hash結果(這個真的比較神奇。。。)所以 scrapy-redis 依舊使用 url 的 fingerprint 來判斷 request 請求是否已經出現過。這個類通過連接 redis,使用一個key來向redis的一個set中插入fingerprint(這個key對于同一種spider是相同的,redis 是一個key-value的數據庫,如果key是相同的,訪問到的值就是相同的,這里使用 spider名字+DupeFilter 的 key 就是為了在不同主機上的不同爬蟲實例,只要屬于同一種 spider,就會訪問到同一個set,而這個 set 就是他們的url判重池 ),如果返回值為0,說明該set中該fingerprint 已經存在(因為集合是沒有重復值的),則返回 False,如果返回值為 1,說明添加了一個fingerprint到set中,則說明這個 request 沒有重復,于是返回True,還順便把新fingerprint加入到數據庫中了。?
DupeFilter 判重會在 scheduler 類中用到,每一個 request 在進入調度之前都要進行判重,如果重復就不需要參加調度,直接舍棄就好了,不然就是白白浪費資源。
import logging import timefrom scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprintfrom . import defaults from .connection import get_redis_from_settingslogger = logging.getLogger(__name__)# TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""server = get_redis_from_settings(settings)# XXX: This creates one-time key. needed to support to use this# class as standalone dupefilter with scrapy's default scheduler# if scrapy passes spider on open() method this wouldn't be needed# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request)# This returns the number of values added, zero if already exists.added = self.server.sadd(self.key, fp)return added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)@classmethoddef from_spider(cls, spider):settings = spider.settingsserver = get_redis_from_settings(settings)dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)key = dupefilter_key % {'spider': spider.name}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)def close(self, reason=''):"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request)s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request)s"" - no more duplicates will be shown"" (see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = FalseScrapy?用集合實現這個 request 去重功能,Scrapy?中把已經發送的 request 指紋 放入到一個集合中,把下一個request 的指紋拿到集合中比對,如果該指紋存在于集合中,說明這個 request 發送過了,如果沒有則繼續操作。
核心的判重功能:
在 scrapy-redis 中去重是由 Duplication Filter 組件來實現的,它通過 redis 的 set 不重復的特性,巧妙的實現了 DuplicationFilter去重。scrapy-redis 調度器從引擎接受request,將 request 的指紋存入 redis 的 set 檢查是否重復,并將不重復的 request push寫入 redis 的 request queue。
引擎請求 request (Spider發出的)時,調度器從 redis 的request queue 隊列里根據優先級 pop 出?個request 返回給引擎,引擎將此request發給spider處理。
?
picklecompat.py
?
這里實現了 loads 和 dumps 兩個函數,其實就是實現了一個 serializer,因為 redis 數據庫不能存儲復雜對象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),所以我們存啥都要先串行化成文本才行。這里使用的就是python 的 pickle 模塊,一個兼容 py2 和 py3 的串行化工具。這個 serializer 主要用于一會的 scheduler 存 reuqest 對象,至于為什么不實用 json 格式,我也不是很懂,item pipeline 的串行化默認用的就是 json。
"""A pickle wrapper module with protocol=-1 by default."""try:import cPickle as pickle # PY2 except ImportError:import pickledef loads(s):return pickle.loads(s)def dumps(obj):return pickle.dumps(obj, protocol=-1)?
pipeline.py
?
這是是用來實現分布式處理的作用。它將 Item 存儲在 redis 中以實現分布式處理。由于在這里需要讀取配置,所以就用到了from_crawler() 函數。pipeline 文件 實現了一個 item pipieline類,和 scrapy 的 item pipeline 是同一個對象,通過從 settings 中拿到我們配置的REDIS_ITEMS_KEY 作為 key,把 item 串行化之后存入 redis 數據庫對應的 value 中(這個value可以看出出是個list,我們的每個item是這個list中的一個結點),這個pipeline把提取出的item存起來,主要是為了方便我們延后處理數據。
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThreadfrom . import connection, defaultsdefault_serialize = ScrapyJSONEncoder().encodeclass RedisPipeline(object):"""Pushes serialized item into a redis list/queueSettings--------REDIS_ITEMS_KEY : strRedis key where to store items.REDIS_ITEMS_SERIALIZER : strObject path to serializer function."""def __init__(self, server,key=defaults.PIPELINE_KEY,serialize_func=default_serialize):"""Initialize pipeline.Parameters----------server : StrictRedisRedis client instance.key : strRedis key where to store items.serialize_func : callableItems serializer function."""self.server = serverself.key = keyself.serialize = serialize_func@classmethoddef from_settings(cls, settings):params = {'server': connection.from_settings(settings),}if settings.get('REDIS_ITEMS_KEY'):params['key'] = settings['REDIS_ITEMS_KEY']if settings.get('REDIS_ITEMS_SERIALIZER'):params['serialize_func'] = load_object(settings['REDIS_ITEMS_SERIALIZER'])return cls(**params)@classmethoddef from_crawler(cls, crawler):return cls.from_settings(crawler.settings)def process_item(self, item, spider):return deferToThread(self._process_item, item, spider)def _process_item(self, item, spider):key = self.item_key(item, spider)data = self.serialize(item)self.server.rpush(key, data)return itemdef item_key(self, item, spider):"""Returns redis key based on given spider.Override this function to use a different key depending on the itemand/or spider."""return self.key % {'spider': spider.name}?
queue.py
?
該文件實現了幾個容器類,可以看這些容器和redis交互頻繁,同時使用了我們上邊 picklecompat 中定義的 serializer。這個文件實現的幾個容器大體相同,只不過一個是隊列,一個是棧,一個是優先級隊列,這三個容器到時候會被scheduler對象實例化,來實現 request的調度。比如:我們使用 SpiderQueue 作為調度隊列的類型,到時候 request 的調度方法就是先進先出,而實用SpiderStack 就是先進后出了。?
我們可以仔細看看 SpiderQueue 的實現,他的 push 函數就和其他容器的一樣,只不過 push進去的 request請求先被scrapy的接口 request_to_dict 變成了一個dict對象(因為request對象實在是比較復雜,有方法有屬性不好串行化),之后使用picklecompat中的serializer串行化為字符串,然后使用一個特定的 key 存入redis中(該key在同一種spider中是相同的)。而調用pop時,其實就是從redis用那個特定的key去讀其值(一個list),從list中讀取最早進去的那個,于是就先進先出了。?
這些容器類都會作為 scheduler 調度 request 的容器,scheduler 在每個主機上都會實例化一個,并且和 spider一一對應,所以分布式運行時會有一個 spider 的多個實例和一個 scheduler 的多個實例存在于不同的主機上,但是,因為 scheduler 都是用相同的容器,而這些容器都連接同一個 redis 服務器,又都使用 spider 名加 queue 來作為 key 讀寫數據,所以不同主機上的不同爬蟲實例公用一個 request 調度池,實現了分布式爬蟲之間的統一調度。
from scrapy.utils.reqser import request_to_dict, request_from_dictfrom . import picklecompatclass Base(object):"""Per-spider base queue class"""def __init__(self, server, spider, key, serializer=None):"""Initialize per-spider redis queue.Parameters----------server : StrictRedisRedis client instance.spider : SpiderScrapy spider instance.key: strRedis key where to put and get messages.serializer : objectSerializer object with ``loads`` and ``dumps`` methods."""if serializer is None:# Backward compatibility.# TODO: deprecate pickle.serializer = picklecompatif not hasattr(serializer, 'loads'):raise TypeError("serializer does not implement 'loads' function: %r"% serializer)if not hasattr(serializer, 'dumps'):raise TypeError("serializer '%s' does not implement 'dumps' function: %r"% serializer)self.server = serverself.spider = spiderself.key = key % {'spider': spider.name}self.serializer = serializerdef _encode_request(self, request):"""Encode a request object"""obj = request_to_dict(request, self.spider)return self.serializer.dumps(obj)def _decode_request(self, encoded_request):"""Decode an request previously encoded"""obj = self.serializer.loads(encoded_request)return request_from_dict(obj, self.spider)def __len__(self):"""Return the length of the queue"""raise NotImplementedErrordef push(self, request):"""Push a request"""raise NotImplementedErrordef pop(self, timeout=0):"""Pop a request"""raise NotImplementedErrordef clear(self):"""Clear queue/stack"""self.server.delete(self.key)class FifoQueue(Base):"""Per-spider FIFO queue"""def __len__(self):"""Return the length of the queue"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.brpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.rpop(self.key)if data:return self._decode_request(data)class PriorityQueue(Base):"""Per-spider priority queue abstraction using redis' sorted set"""def __len__(self):"""Return the length of the queue"""return self.server.zcard(self.key)def push(self, request):"""Push a request"""data = self._encode_request(request)score = -request.priority# We don't use zadd method as the order of arguments change depending on# whether the class is Redis or StrictRedis, and the option of using# kwargs only accepts strings, not bytes.self.server.execute_command('ZADD', self.key, score, data)def pop(self, timeout=0):"""Pop a requesttimeout not support in this queue class"""# use atomic range/remove using multi/execpipe = self.server.pipeline()pipe.multi()pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)results, count = pipe.execute()if results:return self._decode_request(results[0])class LifoQueue(Base):"""Per-spider LIFO queue."""def __len__(self):"""Return the length of the stack"""return self.server.llen(self.key)def push(self, request):"""Push a request"""self.server.lpush(self.key, self._encode_request(request))def pop(self, timeout=0):"""Pop a request"""if timeout > 0:data = self.server.blpop(self.key, timeout)if isinstance(data, tuple):data = data[1]else:data = self.server.lpop(self.key)if data:return self._decode_request(data)# TODO: Deprecate the use of these names. SpiderQueue = FifoQueue SpiderStack = LifoQueue SpiderPriorityQueue = PriorityQueue可以看出,是以 base 為基類,然后被三個隊列的類繼承。然后進行了pop和push的操作。
- FifoQueue: 繼承 Base,重寫了 push 和 pop 方法實現?先進先出?隊列。
- PriorityQueue: 繼承 Base,重寫了 push 和 pop 方法實現?優先級?隊列。
- LifoQueue: 繼承 Base,重寫了 push 和 pop 方法實現?后進先出?隊列。
?
scheduler.py
?
? ? ? ? scrapy 改造了 python 本來的 collection.deque(雙向隊列)形成了自己的 Scrapy queue,但是 Scrapy 多個 spider 不能共享待爬取隊列 Scrapy queue,即 Scrapy 本身不支持爬蟲分布式。
? ? ? ? scrapy-redis 的解決是把這個 Scrapy queue 換成 redis 數據庫(也是指 redis 隊列),從同一個 redis-server 存放要爬取的request,便能讓多個 spider 去同一個數據庫里讀取。
? ? ? ? Scrapy 中跟 “待爬隊列” 直接相關的就是調度器 Scheduler,它負責對新的 request 進行入列操作(加入Scrapy queue),取出下一個要爬取的 request(從Scrapy queue中取出)等操作。它把待爬隊列按照優先級建立了一個字典結構,然后根據 request 中 的優先級,來決定該入哪個隊列,出列時則按優先級較小的優先出列。為了管理這個比較高級的隊列字典,Scheduler 需要提供一系列的方法。但是原來的 Scheduler 已經無法使用,所以使用 Scrapy-redis 的 scheduler 組件。
? ? ? ? scheduler.py?這個文件重寫了 scheduler 類,用來代替 scrapy.core.scheduler 的原有調度器。其實對原有調度器的邏輯沒有很大的改變,主要是使用了redis 作為數據存儲的媒介,以達到各個爬蟲之間的統一調度。?scheduler 負責調度各個 spider 的 request 請求,scheduler 初始化時,通過 settings 文件讀取 queue 和 dupefilters 的類型(一般就用上邊默認的),配置 queue 和 dupefilters 使用的 key(一般就是spider name加上queue或者dupefilters,這樣對于同一種spider 的不同實例,就會使用相同的數據塊了)。每當一個 request 要被調度時,enqueue_request 被調用,scheduler 使用dupefilters 來判斷這個url是否重復,如果不重復,就添加到 queue 的容器中(先進先出,先進后出和優先級都可以,可以在settings中配置)。當調度完成時,next_request 被調用,scheduler 就通過 queue 容器的接口,取出一個 request,把他發送給相應的 spider,讓spider 進行爬取工作。
import importlib import sixfrom scrapy.utils.misc import load_objectfrom . import connection, defaults# TODO: add SCRAPY_JOB support. class Scheduler(object):"""Redis-based schedulerSettings--------SCHEDULER_PERSIST : bool (default: False)Whether to persist or clear redis queue.SCHEDULER_FLUSH_ON_START : bool (default: False)Whether to flush redis queue on start.SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)How many seconds to wait before closing if no message is received.SCHEDULER_QUEUE_KEY : strScheduler redis key.SCHEDULER_QUEUE_CLASS : strScheduler queue class.SCHEDULER_DUPEFILTER_KEY : strScheduler dupefilter redis key.SCHEDULER_DUPEFILTER_CLASS : strScheduler dupefilter class.SCHEDULER_SERIALIZER : strScheduler serializer."""def __init__(self, server,persist=False,flush_on_start=False,queue_key=defaults.SCHEDULER_QUEUE_KEY,queue_cls=defaults.SCHEDULER_QUEUE_CLASS,dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,idle_before_close=0,serializer=None):"""Initialize scheduler.Parameters----------server : RedisThe redis server instance.persist : boolWhether to flush requests when closing. Default is False.flush_on_start : boolWhether to flush requests on start. Default is False.queue_key : strRequests queue key.queue_cls : strImportable path to the queue class.dupefilter_key : strDuplicates filter key.dupefilter_cls : strImportable path to the dupefilter class.idle_before_close : intTimeout before giving up."""if idle_before_close < 0:raise TypeError("idle_before_close cannot be negative")self.server = serverself.persist = persistself.flush_on_start = flush_on_startself.queue_key = queue_keyself.queue_cls = queue_clsself.dupefilter_cls = dupefilter_clsself.dupefilter_key = dupefilter_keyself.idle_before_close = idle_before_closeself.serializer = serializerself.stats = Nonedef __len__(self):return len(self.queue)@classmethoddef from_settings(cls, settings):kwargs = {'persist': settings.getbool('SCHEDULER_PERSIST'),'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),}# If these values are missing, it means we want to use the defaults.optional = {# TODO: Use custom prefixes for this settings to note that are# specific to scrapy-redis.'queue_key': 'SCHEDULER_QUEUE_KEY','queue_cls': 'SCHEDULER_QUEUE_CLASS','dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',# We use the default setting name to keep compatibility.'dupefilter_cls': 'DUPEFILTER_CLASS','serializer': 'SCHEDULER_SERIALIZER',}for name, setting_name in optional.items():val = settings.get(setting_name)if val:kwargs[name] = val# Support serializer as a path to a module.if isinstance(kwargs.get('serializer'), six.string_types):kwargs['serializer'] = importlib.import_module(kwargs['serializer'])server = connection.from_settings(settings)# Ensure the connection is working.server.ping()return cls(server=server, **kwargs)@classmethoddef from_crawler(cls, crawler):instance = cls.from_settings(crawler.settings)# FIXME: for now, stats are only supported from this constructorinstance.stats = crawler.statsreturn instancedef open(self, spider):self.spider = spidertry:self.queue = load_object(self.queue_cls)(server=self.server,spider=spider,key=self.queue_key % {'spider': spider.name},serializer=self.serializer,)except TypeError as e:raise ValueError("Failed to instantiate queue class '%s': %s",self.queue_cls, e)self.df = load_object(self.dupefilter_cls).from_spider(spider)if self.flush_on_start:self.flush()# notice if there are requests already in the queue to resume the crawlif len(self.queue):spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))def close(self, reason):if not self.persist:self.flush()def flush(self):self.df.clear()self.queue.clear()def enqueue_request(self, request):if not request.dont_filter and self.df.request_seen(request):self.df.log(request, self.spider)return Falseif self.stats:self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)self.queue.push(request)return Truedef next_request(self):block_pop_timeout = self.idle_before_closerequest = self.queue.pop(block_pop_timeout)if request and self.stats:self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)return requestdef has_pending_requests(self):return len(self) > 0這個文件下只有一個類 Scheduler,一如既往的通過類方法來實例化來實現 外部可以直接通過調用兩個方法然后從爬蟲中獲取settings 和 crawler,但是有兩個比較特殊的函數是 def open(self, spider) 和 def next_request(self):
?
spider.py
?
如果在 settings.py 里面:REDIS_START_URLS_AS_SET = False 的話,就是列表的形式,存入就是 lpush 或者是 rpush 等操作,如果是?REDIS_START_URLS_AS_SET = True?的話,那么存入就是集合的形式,sadd 等操作。
REDIS_START_URLS_AS_SET = False? # 默認是 False ,即 默認 從列表的格式取數據出來。
如果不設置?REDIS_START_URLS_KEY,則默認?REDIS_START_URLS_KEY = '%(name)s:start_urls'?,這個是存入 redis 里面的 key,可以根據這來取 value,例如:start_urls:baidu
redis 寫入 URL ( 即 添加任務 ):
添加 列表 形式 的 任務:
import ?redisconn = redis.Redis(host='127.0.0.1',port=6379) conn.lpush('start_urls:baidu','http://www.baidu.com')添加 集合 形式 的 任務:
import redisconn = redis.Redis(host='127.0.0.1', port=6379) conn.sadd('start_urls:baidu', 'http://www.baidu.com') # 按照這個格式來存數據的 print(conn.smembers('start_urls:baidu'))?
使用 scrapy-redis 提供的方法添加任務 示例:
首先需要在 setting.py 里面配置:
# REDIS_URL = 'redis://用戶名:密碼@主機IP:端口' REDIS_URL = 'redis://127.0.0.1:6379' REDIS_PARAMS = dict(db=15)添加任務:
import redis from scrapy.utils.project import get_project_settings from scrapy_redis.connection import get_redis_from_settings# 方法 1 # 如果是在 scrapy-redis 工程里面可以使用這個方法 server_1 = get_redis_from_settings(get_project_settings()) server_1.sadd('start_urls:server_1', 'http://www.baidu.com')# 方法 2 server_2 = redis.Redis(host='127.0.0.1', port=6379, db=15) server_2.sadd('start_urls:server_2', 'http://www.baidu.com')# 方法 3 redis_url = 'redis://root:xxxx@47.110.xx.xx:6379'# 加上 decode_responses=True,寫入的鍵值對中的value為str類型,不加這個參數寫入的則為字節類型。 r = redis.Redis.from_url(redis_url, decode_responses=True) r.lpush('test_key', 'wwww')?
spider 的改動也不是很大,主要是通過 connect 接口,給 spider 綁定了 spider_idle 信號,spider 初始化時,通過 setup_redis 函數初始化好 redis 的連接,之后通過 next_requests 函數從 redis 中取出 strat url,使用的 key 是 settings 中REDIS_START_URLS_AS_SET 定義的(注意了這里的初始化 url 池 和 我們上邊的 queue 的 url池 不是一個東西,queue的池是用于調度的,初始化 url池 是存放入口 url 的,他們都存在 redis 中,但是使用不同的 key 來區分,就當成是不同的表吧),spider 使用少量的 start url,可以發展出很多新的 url,這些 url 會進入 scheduler 進行判重和調度。直到 spider 跑到調度池內沒有 url 的時候,會觸發 spider_idle 信號,從而觸發 spider 的 next_requests 函數,再次從 redis 的 start url 池中讀取一些url。
分析:在這個 spider 中通過 connect signals.spider_idle 信號實現對 crawler 狀態的監視。當 idle 時,返回新的make_requests_from_url(url) 給引擎,進而交給調度器調度。
from scrapy import signals from scrapy.exceptions import DontCloseSpider from scrapy.spiders import Spider, CrawlSpiderfrom . import connection, defaults from .utils import bytes_to_strclass RedisMixin(object):"""Mixin class to implement reading urls from a redis queue."""redis_key = Noneredis_batch_size = Noneredis_encoding = None# Redis client placeholder.server = Nonedef start_requests(self):"""Returns a batch of start requests from redis."""return self.next_requests()def setup_redis(self, crawler=None):"""Setup redis connection and idle signal.This should be called after the spider has set its crawler object."""if self.server is not None:returnif crawler is None:# We allow optional crawler argument to keep backwards# compatibility.# XXX: Raise a deprecation warning.crawler = getattr(self, 'crawler', None)if crawler is None:raise ValueError("crawler is required")settings = crawler.settingsif self.redis_key is None:self.redis_key = settings.get('REDIS_START_URLS_KEY', defaults.START_URLS_KEY,)self.redis_key = self.redis_key % {'name': self.name}if not self.redis_key.strip():raise ValueError("redis_key must not be empty")if self.redis_batch_size is None:# TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE).self.redis_batch_size = settings.getint('REDIS_START_URLS_BATCH_SIZE',settings.getint('CONCURRENT_REQUESTS'),)try:self.redis_batch_size = int(self.redis_batch_size)except (TypeError, ValueError):raise ValueError("redis_batch_size must be an integer")if self.redis_encoding is None:self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING)self.logger.info("Reading start URLs from redis key '%(redis_key)s' ""(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s",self.__dict__)self.server = connection.from_settings(crawler.settings)# The idle signal is called when the spider has no requests left,# that's when we will schedule new requests from redis queuecrawler.signals.connect(self.spider_idle, signal=signals.spider_idle)def next_requests(self):"""Returns a request to be scheduled or none."""use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET)fetch_one = self.server.spop if use_set else self.server.lpop# XXX: Do we need to use a timeout here?found = 0# TODO: Use redis pipeline execution.while found < self.redis_batch_size:data = fetch_one(self.redis_key)if not data:# Queue empty.breakreq = self.make_request_from_data(data)if req:yield reqfound += 1else:self.logger.debug("Request not made from data: %r", data)if found:self.logger.debug("Read %s requests from '%s'", found, self.redis_key)def make_request_from_data(self, data):"""Returns a Request instance from data coming from Redis.By default, ``data`` is an encoded URL. You can override this method toprovide your own message decoding.Parameters----------data : bytesMessage from redis."""url = bytes_to_str(data, self.redis_encoding)return self.make_requests_from_url(url)def schedule_next_requests(self):"""Schedules a request if available"""# TODO: While there is capacity, schedule a batch of redis requests.for req in self.next_requests():self.crawler.engine.crawl(req, spider=self)def spider_idle(self):"""Schedules a request if available, otherwise waits."""# XXX: Handle a sentinel to close the spider.self.schedule_next_requests()raise DontCloseSpiderclass RedisSpider(RedisMixin, Spider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: False)Use SET operations to retrieve messages from the redis queue. If False,the messages are retrieve using the LPOP command.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return objclass RedisCrawlSpider(RedisMixin, CrawlSpider):"""Spider that reads urls from redis queue when idle.Attributes----------redis_key : str (default: REDIS_START_URLS_KEY)Redis key where to fetch start URLs from..redis_batch_size : int (default: CONCURRENT_REQUESTS)Number of messages to fetch from redis on each attempt.redis_encoding : str (default: REDIS_ENCODING)Encoding to use when decoding messages from redis queue.Settings--------REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls")Default Redis key where to fetch start URLs from..REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS)Default number of messages to fetch from redis on each attempt.REDIS_START_URLS_AS_SET : bool (default: True)Use SET operations to retrieve messages from the redis queue.REDIS_ENCODING : str (default: "utf-8")Default encoding to use when decoding messages from redis queue."""@classmethoddef from_crawler(self, crawler, *args, **kwargs):obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs)obj.setup_redis(crawler)return obj這個 spider 文件有三個類,RedisMixin 是一個基類,剩余兩個是多繼承。
RedisMixin 類:
- 1 setup_redis 主要是獲取 redis 的 server 和獲取爬蟲的 idle signal,當爬蟲沒有請求時,調用空閑信號,那時我們將從 redis隊列安排新請求。
- 2?spider_idle 主要是 idle 信號處理,這里調用 schedule_next_requests 完成從 Redis 調度
- 3 make_request_from_data 主要是從 redis 隊列獲取ur
- 4 next_requests 這個主要是 redis 獲取 url,我從這里看到可以設置 url 隊列在 redis 的存儲的數據格式函數schedule_next_requests從next_requests 獲取 url 包裝為 HttpRequest
RedisSpider(RedisMixin, Spiser) 類:多繼承,用 RedisMixin 調度功能覆蓋 Spider 原生
RedisCrawlSpider(RedisMixin, CrawlSpiser) 類:多繼承,用 RedisMixin 調度功能覆蓋 CrawlSpider 原生
?
當編寫分布式爬蟲時,不在使用 scrapy 原有的 Spider 類,重寫的 RedisSpider 繼承了 Spider 和 RedisMixin 這兩個類,RedisMixin 是用來從 redis 讀取 url 的類。
當我們生成一個 Spider 繼承 RedisSpider 時,調用 setup_redis 函數,這個函數會去連接 redis 數據庫,然后會設置 signals (信號):
- 一個是當 spider 空閑時候的 signal,會調用 spider_idle 函數,這個函數調用?schedule_next_request?函數,保證 spider 是一直活著的狀態,并且拋出 DontCloseSpider 異常。
- 一個是當抓到一個 item 時的 signal,會調用 item_scraped 函數,這個函數會調用?schedule_next_request?函數,獲取下一個 request。
?
scrapy-redis 的 總體思路:
?
?
?
集成 bloomfilter 到 scrapy-redis 中
?
傳送門:bloomfilter算法詳解及實例
算法實現:bloomfilter_imooc
?
dupefilter.py:
import logging import timefrom scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprintfrom . import defaults from .connection import get_redis_from_settingslogger = logging.getLogger(__name__)# TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter):"""Redis-based request duplicates filter.This class can also be used with default Scrapy's scheduler."""logger = loggerdef __init__(self, server, key, debug=False):"""Initialize the duplicates filter.Parameters----------server : redis.StrictRedisThe redis server instance.key : strRedis key Where to store fingerprints.debug : bool, optionalWhether to log filtered requests."""self.server = serverself.key = keyself.debug = debugself.logdupes = True@classmethoddef from_settings(cls, settings):"""Returns an instance from given settings.This uses by default the key ``dupefilter:<timestamp>``. When using the``scrapy_redis.scheduler.Scheduler`` class, this method is not used asit needs to pass the spider name in the key.Parameters----------settings : scrapy.settings.SettingsReturns-------RFPDupeFilterA RFPDupeFilter instance."""server = get_redis_from_settings(settings)# XXX: This creates one-time key. needed to support to use this# class as standalone dupefilter with scrapy's default scheduler# if scrapy passes spider on open() method this wouldn't be needed# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)@classmethoddef from_crawler(cls, crawler):"""Returns instance from crawler.Parameters----------crawler : scrapy.crawler.CrawlerReturns-------RFPDupeFilterInstance of RFPDupeFilter."""return cls.from_settings(crawler.settings)def request_seen(self, request):"""Returns True if request was already seen.Parameters----------request : scrapy.http.RequestReturns-------bool"""fp = self.request_fingerprint(request)# This returns the number of values added, zero if already exists.added = self.server.sadd(self.key, fp)return added == 0def request_fingerprint(self, request):"""Returns a fingerprint for a given request.Parameters----------request : scrapy.http.RequestReturns-------str"""return request_fingerprint(request)@classmethoddef from_spider(cls, spider):settings = spider.settingsserver = get_redis_from_settings(settings)dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)key = dupefilter_key % {'spider': spider.name}debug = settings.getbool('DUPEFILTER_DEBUG')return cls(server, key=key, debug=debug)def close(self, reason=''):"""Delete data on close. Called by Scrapy's scheduler.Parameters----------reason : str, optional"""self.clear()def clear(self):"""Clears fingerprints data."""self.server.delete(self.key)def log(self, request, spider):"""Logs given request.Parameters----------request : scrapy.http.Requestspider : scrapy.spiders.Spider"""if self.debug:msg = "Filtered duplicate request: %(request)s"self.logger.debug(msg, {'request': request}, extra={'spider': spider})elif self.logdupes:msg = ("Filtered duplicate request %(request)s"" - no more duplicates will be shown"" (see DUPEFILTER_DEBUG to show all duplicates)")self.logger.debug(msg, {'request': request}, extra={'spider': spider})self.logdupes = False?
?
Scrapy-Redis 分布式爬蟲框架使用實例
?
1. 創建項目
scrapy startproject example
example/
├── scrapy.cfg
└── example
? ? ├── __init__.py
? ? ├── items.py
? ? ├── middlewares.py
? ? ├── pipelines.py
? ? ├── settings.py
? ? └── spiders
? ? ? ? ├── __init__.py
? ? ? ? └── my_crawlspider.py/my_redisspider.py/my_rediscrawlspider.py? # 3中類型的Scrap-Redis爬蟲
?
2. 明確目標
vim items.py :定義要爬取的字段
import scrapyclass ExampleItem(scrapy.Item):name = scrapy.Field()description = scrapy.Field()link = scrapy.Field()crawled = scrapy.Field()spider = scrapy.Field()url = scrapy.Field()也可以不寫 item ,直接返回一個 Python 類型 的 dict 對象,因為 item 本身就是一個 dict 類型的對象。
?
3. 編寫自定義 pipeline
vim pipelines.py
from datetime import datetime class ExampPipeline(object):def process_item(self,item,spider):item['crawled'] = datetime.utcnow() # 調用datetime.utcnow()方法獲取爬蟲執行時的UTC時間# 調用spider.name屬性獲取當前爬蟲名(因為可能同時有多個爬蟲在爬取,這樣可以看到誰爬了哪些網頁)item['spider'] = spider.name return item?
4. 注冊自定義 pipeline 及 Scrapy-Redis 分布式爬蟲相關設置
vim settings.py
#-----------Scrapy-Redis分布式爬蟲相關設置如下------------- # 使用Scrapy-Redis的去重組件,不再使用scrapy的去重組件 DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用Scrapy-Redis的調度器,不再使用scrapy的調度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 使用Scrapy-Redis的從請求集合中取出請求的方式,三種方式擇其一即可: SCHEDULER_CLASS = "scrapy_redis.queue.SpiderPriorityQueue" # 分別按(1)請求的優先級/(2)隊列FIFO/(3)棧FILO 取出請求 #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderQueue" #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderStack" SCHEDULER_PERSIST = True # 允許暫停,redis請求記錄不會丟失(重啟爬蟲不會重頭爬取已爬過的頁面)REDIS_HOST = "200.200.200.200" # 這兩項是Redis連接設置,如果注釋或不寫會默認將數據存放到本機的Redis中 REDIS_PORT = 6379 # 注意:master端的Redis需要允許遠程連接--配置中注釋掉bind 127.0.0.1#----------注冊RedisPipeline/自定義pipeline------------------ # # 注意:自定義pipeline的優先級需高于Redispipeline,因為RedisPipeline不會返回item, # 所以如果RedisPipeline優先級高于自定義pipeline,那么自定義pipeline無法獲取到item ITEM_PIPELINES = {"example.pipelines.ExampPipeline":300, # 自定義pipeline視情況選擇性注冊(可選)"scrapy_redis.pipelines.RedisPipeline":400 # 將RedisPipeline注冊到pipeline組件中(這樣才能將數據存入Redis) }?
5. 編寫爬蟲
( 三種 Scrapy-Redis 爬蟲:CrawlSpider / RedisSpider / RedisCrawlSpider )
scrapy.Spider 和scrapy.CrawlSpider 區別:http://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/spiders.html
Spider 是最簡單的 spider。每個其他的 spider 必須繼承自該類(包括 Scrapy 自帶的其他 spider 以及您自己編寫的 spider )。 Spider 并沒有提供什么特殊的功能。 其僅僅提供了 start_requests() 的默認實現,讀取并請求 spider 屬性中的 start_urls,并根據返回的結果 (resulting responses) 調用 spider 的 parse 方法。
CrawlSpider 是爬取一般網站常用的 spider。CrawlSpider 定義了一些規則(rule)來提供跟進 link 的方便的機制。 也許該 spider并不是完全適合您的特定網站或項目,但其對很多情況都使用。 因此您可以以其為起點,根據需求修改部分方法。當然您也可以實現自己的spider。
?
類型一:(基于 scrapy 的非分布式爬蟲)
繼承 CrawlSpider類 的 Scrapy 爬蟲
導入?CrawlSpider 類:from scrapy.spiders import CrawlSpider
(1) 生成爬蟲
? ? ? ? scrapy genspider -t crawl my_crawlspider "dmoz.org"
(2) 設置爬蟲
? ? ? ? vim my_crawlspider.py
# -*- coding: utf-8 -*-from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Ruleclass TestSpider(CrawlSpider):"""TestSpider 繼承 CrawlSpider 爬蟲類,也可以繼承 scrapy.spider.Spider 類。但是如果繼承 CrawlSpider,則可以定義一些規則(rule)來提供跟進 link 的方便的機制"""name = "test_spider"allowed_domains = ["dmoz.org"]start_urls = ["http://www.dmoz.org/"]links = LinkExtractor(restrict_css=('.top-cat', '.sub-cat', '.cat-item'))rules = [Rule(links, callback='parse_directory', follow=True),]def __init__(self):super(TestSpider, self).__init__()self.temp = Nonedef parse_directory(self, response):self.temp = Nonefor div in response.css('.title-and-desc'):data = {'name': div.css('.site-title::text').extract_first(),'description': div.css('.site-descr::text').extract_first(), 'link': div.css('.a::attr(href)').extract(),}yield dataif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl test_spider'.split())pass(3) 執行爬蟲方法--- scrapy crawl my_crawlspider ( 與正常 scrapy 一樣,無需 redis_key,比較雞肋并不是真正的多機器爬蟲)
?
示例:爬取 豆瓣電影
# -*- coding: utf-8 -*-from scrapy.spiders import CrawlSpider, Rule from scrapy.linkextractors import LinkExtractorclass DoubanSpider(CrawlSpider):name = 'douban'allowed_domains = ['movie.douban.com']start_urls = ['https://movie.douban.com/top250']custom_settings = {'DEFAULT_REQUEST_HEADERS': {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,''*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',"Accept-Encoding": "gzip, deflate","Accept-Language": "zh-CN,zh;q=0.9","Connection": "keep-alive","Host": "movie.douban.com","Upgrade-Insecure-Requests": "1","User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'' (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36',},'CONCURRENT_REQUESTS': 10,'DOWNLOAD_DELAY': 0.01,'CONCURRENT_REQUESTS_PER_IP': 0,'CONCURRENT_REQUESTS_PER_DOMAIN': 10000,'FEED_EXPORT_ENCODING': 'utf-8'}# 定義一個爬取規則,從start_urls中的網頁提取LinkExtractor上規定的所有鏈接,callback為對這些鏈接如何處理rules = (Rule(LinkExtractor(allow=(r'https://movie.douban.com/subject/\d+',)),callback='parse_item'),)def parse_item(self, response):print(self.name)data = dict()data['title'] = ''.join(response.xpath('//span[@property="v:itemreviewed"]/text()').extract())print(data)# yield dataif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl douban'.split())pass?
?
類型二:( 基于?Scrapy-Redis 的 分布式爬蟲?)
基于 RedisSpider類 的 Scrapy-Redis 分布式爬蟲
(1) 生成爬蟲--- scrapy genspider my_redisspider "dmoz.org"
(2) 設置爬蟲--- vim my_redisspider.py? ??
# -*- coding: utf-8 -*-# 變化 1: 從 scrapy_redis.spiders 中引入 RedisSpider from scrapy_redis.spiders import RedisSpiderclass TestSpider(RedisSpider): # 變化 2 : 爬蟲類所繼承的父類變為 RedisSpider類name = 'test_spider'# 變化 3 : 多了一個 redis_key,爬蟲從這個 redis_key 里面取任務redis_key = "start_urls:test_spider"def __init__(self):super(TestSpider, self).__init__()def parse(self, response):# 直接將 name/url 存入 Redis數據庫temp = {'name': response.css('.site-title::text').extract_first(),'url': response.url,}return tempif __name__ == '__main__':from scrapy import cmdlinecmdline.execute('scrapy crawl test_spider'.split())pass(3) 執行爬蟲方法
- <1> 將項目代碼復制到多臺 slave 上 ( 可更改爬蟲名 ) 并啟動爬蟲 --- scrapy runspider my_redisspider.py
- <2> 往?redis 里面添加任務 --- lpush start_urls:test_spider http://www.dmoz.org/
?
類型三:(?Scrapy-Redis 的?RedisCrawlSpider 分布式爬蟲 )
基于 RedisCrawlSpider 類的 Scrapy-Redis 分布式爬蟲
(1) 生成爬蟲 --- scrapy genspider -t crawl my_rediscrawlspider "dmoz.org"
(2) 設置爬蟲 --- vim my_rediscrawlspider.py? ?
(3) 執行爬蟲方法?
- <1> 將項目代碼復制到多臺 slave上(可更改爬蟲名)并啟動爬蟲 --- scrapy runspider my_rediscrawlspider.py
- <2> 往?redis 里面添加任務 --- lpush start_urls:test_spider http://www.dmoz.org/
?
小結:? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
- 如果只想使用 Redis 的去重和保存功能 ,使 用類型一
- 如果寫分布式,根據情況選擇 類型二/類型三
- 如果寫聚焦爬蟲(全站式爬蟲),選擇 類型三
?
?
2.Scrapy-Rdis-project: youyuan??
?
?(Scrapy-Redis 分布式爬蟲框架進階1----有緣網:非分布式基于CrawlSpider的scrapy項目? )
1.創建項目
? ? ? ? scrapy startproject youyuan
youyuan/
├── scrapy.cfg
└── youyuan
? ? ├── __init__.py
? ? ├── items.py
? ? ├── middlewares.py
? ? ├── pipelines.py
? ? ├── settings.py
? ? └── spiders
? ? ? ? ├── __init__.py
? ? ? ? └── yy.py
生成的項目目錄截圖
2.明確目標
? ? ? ? vim items.py
improt scrapy class YouyuanItem(scrapy.Item):username = scrapy.Field() # 用戶名age = scrapy.Field() # 年齡header_url = scrapy.Field() # 頭像地址image_url = scrapy.Field() # 相冊個圖片地址content = scrapy.Field() # 內心獨白place_from = scrapy.Field() # 籍貫education = scrapy.Field() # 教育hobby = scrapy.Field() # 愛好source_url = scrapy.Field() # 個人主頁source = scrapy.Field() # 數據來源網站3.制作爬蟲??
(1)生成爬蟲--- scrapy genspider -t crawl yy "youyuan.com"
(2)設置爬蟲--- vim yy.py
4.編寫 item pipeline
? ? ? ? vim pipelines.py
import json class YouyuanJsonPipeline(obeject):def __init__(self):self.f = open("youyuan.json","w")def process_item(self,item,spider):text = json.dumps(dict(item),ensure_ascii=False) + ",\n"self.f.write(item)def close_spider(self,spider):self.f.close()5.啟動上述 pipeline---? vim settings.py
? ? vim settings.py
? ? ? ? ITEM_PIPELINES = {"youyuan.pipelines.YouyuanJsonPipeline":300}
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
6.執行爬蟲--- scrapy crawl yy
?
?
?
3.Scrapy-Redis-project: youyuan??
?
(Scrapy-Redis分布式爬蟲框架進階2----有緣網:非分布式基于CrawlSpider的scrapy項目的數據存入本機Redis? )
說明:僅僅實在上述scrapy項目的基礎上進行settings.py文件的點滴修改,增加一個RedisPipeline而已?
<----并不屬于Scrapy-Redis分布式
youyaun/
├── scrapy.cfg
└── youyaun
? ? ├── __init__.py
? ? ├── items.py
? ? ├── middlewares.py
? ? ├── pipelines.py
? ? ├── settings.py? ? ?<----僅對settings.py文件做部分添加修改
? ? └── spiders
? ? ? ? ├── __init__.py
? ? ? ? └── yy.py
5.settings.py添加部分信息--- 啟用Scrapy-Redis的去重組件/調度器/取出請求方式,以及注冊RedisPipeline組件(讓數據存入Redis)
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用Scrapy-Redis的去重組件,不再使用scrapy的去重組件 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 使用Scrapy-Redis的調度器,不再使用scrapy的調度器 # 使用Scrapy-Redis的從請求集合中取出請求的方式,三種方式擇其一即可: SCHEDULER_CLASS = "scrapy_redis.queue.SpiderPriorityQueue" # 分別按(1)請求的優先級/(2)隊列FIFO/(3)棧FILO 取出請求 #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderQueue" #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderStack" # 允許暫停,redis請求記錄不會丟失(重啟爬蟲不會重頭爬取已爬過的頁面) SCHEDULER_PERSIST = True # 這兩項是Redis連接設置,注釋或不寫默認將數據存放到本機的Redis中 #REDIS_HOST = "200.200.200.200" #REDIS_PORT = 6379 #----------注冊RedisPipeline/自定義pipeline------------------ #注意:自定義pipeline的優先級需高于Redispipeline,因為RedisPipeline不會返回item, #所以如果RedisPipeline優先級高于自定義pipeline,那么自定義pipeline無法獲取到item ITEM_PIPELINES = {"youyuan.pipelines.YouyuanJsonPipeline":300, # 自定義pipeline視情況選擇性注冊(可選)# 將RedisPipeline注冊到pipeline組件中(這樣才能將數據存入Redis)"scrapy_redis.pipelines.RedisPipeline":400 }6.執行爬蟲--- scrapy crawl yy
# 注意: 在原始scrapy項目的基礎上,在settings.py文件中添加上述幾行設置,就可以將scrapy爬取的數據存放到本機redis中
# 注意: 上述要想成功保存到本機Redis,有兩個前提:本機必須(pip install scrapy-redis) 和本機redis必須啟動(redis-server /etc/redis.conf)
?
?
4.Scrapy-Rdis-project: youyuan??
(Scrapy-Redis分布式爬蟲框架進階3----有緣網:非分布式基于CrawlSpider的scrapy項目 ----> 改寫為:RedisCrawlSpider類的Scrapy-Redis分布式爬蟲項目 )
說明:僅僅實在原始scrapy項目的基礎上對settings.py/yy.py文件進行的點滴修改即可
? ? youyaun/
? ? ├── scrapy.cfg
? ? └── youyaun
? ? ? ? ├── __init__.py
? ? ? ? ├── items.py
? ? ? ? ├── middlewares.py
? ? ? ? ├── pipelines.py
? ? ? ? ├── settings.py? ? ?<----對settings.py文件做部分添加修改
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (使用Scrapy-Redis的去重組件/調度器/取出請求策略
? ? ? ? ? /允許暫停/指明遠程Redis主機,并注冊RedisPipeline組件)
? ? ? ? └── spiders
? ? ? ? ? ? ├── __init__.py
? ? ? ? ? ? └── yy.py? ? ? ?<----對爬蟲文件進行部分修改(引入RedisCrawlSpider爬蟲類/去掉allowed_domain/去掉start_urls/增加redis_key/改寫init方法動態獲取限制域)
1.修改設置文件--- vim settings.py? ?
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用Scrapy-Redis的去重組件,不再使用scrapy的去重組件 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 使用Scrapy-Redis的調度器,不再使用scrapy的調度器 # 使用Scrapy-Redis的從請求集合中取出請求的方式,三種方式擇其一即可: SCHEDULER_CLASS = "scrapy_redis.queue.SpiderPriorityQueue" #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderQueue" # 分別按(1)請求的優先級/(2)隊列FIFO/(3)棧FILO 取出請求 #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderStack" # 允許暫停,redis請求記錄不會丟失(重啟爬蟲不會重頭爬取已爬過的頁面) SCHEDULER_PERSIST = True REDIS_HOST = "200.200.200.200" # 這兩項是Redis連接設置,如果注釋或不寫會默認將數據存放到本機的Redis中 REDIS_PORT = 6379 # 注意:master端的Redis需要允許遠程連接--配置中注釋掉bind 127.0.0.1 #----------注冊RedisPipeline/自定義pipeline------------------ #注意:自定義pipeline的優先級需高于Redispipeline,因為RedisPipeline不會返回item, #所以如果RedisPipeline優先級高于自定義pipeline,那么自定義pipeline無法獲取到item ITEM_PIPELINES = {"youyuan.pipelines.YouyuanJsonPipeline":300, # 自定義pipeline視情況選擇性注冊(可選)"scrapy_redis.pipelines.RedisPipeline":400 # 將RedisPipeline注冊到pipeline組件中(這樣才能將數據存入Redis) }2. 修改爬蟲文件--- vim yy.py
import scrapy from scrapy linkextractor import LinkExtractor from scrapy.Spiders import CrawlSpider,Rule from youyuan.items import YouyuanItem import re from scrapy_redis.Spiders import RedisCrawlSpider # 變化1:從scrapy_redis.Spiders中引入RedisCrawlSpiderclass YySpider(RedisCrawlSpider): # 變化2:爬蟲類所繼承的父類變為RedisCrawlSpider類name = "yy"redis_key = "yyspider:start_urls" # 變化3:多了一個對所有爬蟲發號施令的redis_key,少了allowed_domain和start_urlsdef __init__(self,*args,**kwargs): # 變化4:重寫__init__方法:動態獲取限制域domain = kwargs.pop('domain','')self.allowed_domain = filter(None,domain,split(','))super(YySpider,self).__init__(*args,**kwargs) # 注意super()里面的參數因爬蟲類名不同而不同page_links = LinkExtractor(allow=(r'youyuan.com/find/beijing/mm18-25/advance-0-0-0-0-0-0-0/p\d+/')) person_links = LinkExtractor(allow =(r'youyuan.com/\d+-profile/')) .............. # 后面的代碼都相同3.爬蟲的執行方式改變
? ? ? ?<1>將項目代碼復制到給臺slave上(可更改爬蟲名)并啟動爬蟲--- scrapy runspider yy.py
? ?<2>在master端redis上發號施令--- lpush yyspider:start_urls http://www.youyuan.com/find/beijing/mm18-25/advance-0-0-0-0-0-0-0/p1/? ??
?
?
?
?
5.Scrapy-Redis-project: sina2??
(Scrapy-Redis分布式爬蟲框架----新浪分類資訊:非分布式基于scrapy.Spider的scrapy項目 ----> 改寫為:RedisSpider類的Scrapy-Redis分布式爬蟲項目 )??改寫該項目注意:由于改寫后數據存放到Redis,所以需要去掉"目錄存儲路徑"相關的代碼
1.創建項目--- scrapy startproject sina2
? ? ? ? sina2/
? ? ? ? ├── scrapy.cfg
? ? ? ? └── sina2
? ? ? ? ? ? ├── __init__.py
? ? ? ? ? ? ├── items.py
? ? ? ? ? ? ├── middlewares.py
? ? ? ? ? ? ├── pipelines.py
? ? ? ? ? ? ├── settings.py? ? ?<----對settings.py文件做部分添加修改(使用Scrapy-Redis的去重組件/調度器/取出請求策略/
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?允許暫停/指明遠程Redis主機,并注冊RedisPipeline組件)
? ? ? ? ? ? └── spiders
? ? ? ? ? ? ? ? ├── __init__.py
? ? ? ? ? ? ? ? └── xinlang.py? <----對爬蟲文件進行部分修改(引入RedisCrawlSpider爬蟲類/去掉allowed_domain
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?/去掉start_urls/增加redis_key/改寫init方法動態獲取限制域)
2.明確目標--- vim items.py?
? ? vim items.py
? ? ? ? import scrapy
?
3.制作爬蟲
(1)生成爬蟲--- scrapy genspider xinlang "sina.com.cn"
(2)設置爬蟲--- vim xinlang.py
4.編寫item pipelines
--- vim pipelines.py (忽略)? # (數據存放到Redis,不再需要這塊代碼,不需要自定義pipeline保存數據到本地)
?
5.啟用上述pipeline組件--- vim settings.py
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # 使用Scrapy-Redis的去重組件,不再使用scrapy的去重組件 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 使用Scrapy-Redis的調度器,不再使用scrapy的調度器 # 使用Scrapy-Redis的從請求集合中取出請求的方式,三種方式擇其一即可: SCHEDULER_CLASS = "scrapy_redis.queue.SpiderPriorityQueue" #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderQueue" # 分別按(1)請求的優先級/(2)隊列FIFO/(3)棧FILO 取出請求 #SCHEDULER_CLASS = "scrapy_redis.queue.SpiderStack" SCHEDULER_PERSIST = True # 允許暫停,redis請求記錄不會丟失(重啟爬蟲不會重頭爬取已爬過的頁面)REDIS_HOST = "200.200.200.200" # 這兩項是Redis連接設置,如果注釋或不寫會默認將數據存放到本機的Redis中 REDIS_PORT = 6379 # 注意:master端的Redis需要允許遠程連接--配置中注釋掉bind 127.0.0.1 # (數據存放到Redis,不再需要這塊代碼,不需要自定義pipeline保存數據到本地) ITEM_PIPELINES = { #"sina.pipelines.SinaSavePipeline":300, # 將RedisPipeline注冊到pipeline組件中(這樣才能將數據存入Redis)"scrapy_redis.pipelines.RedisPipeline":400 }6.爬蟲的執行方式改變
?
<1>將項目代碼復制到給臺slave上(可更改爬蟲名)并啟動爬蟲--- scrapy runspider xinlang.py
<2>在master端redis上發號施令--- lpush yyspider:start_urls http://news.sina.com.cn/guide/? ?
?
?
?
6.Scrapy-Rdis-project: youyuan?
?(Scrapy-Redis分布式爬蟲框架----將Redis中數據持久化存儲到MongoDB/MySQL中----> 將有緣網分布式爬取到Redis中的數據轉存到MongoDB/MySQL中)
要將Scrapy-Redis項目爬取到Redis中的數據轉存到Mongodb/MySQL中,只需要在項目一級目錄下創建兩個轉存的腳本文件即可
有緣網Scrapy-Redis項目樹形圖
? ? ? ? youyuan/
? ? ? ? ├── scrapy.cfg
? ? ? ? ├── process_item_for_mongodb.py? ? ?<----項目一級目錄下創建將Redis數據轉存到mongodb的python腳本
? ? ? ? ├── process_item_for_mysql.py? ? ? ?<----項目一級目錄下創建將Redis數據轉存到mysql的python腳本
? ? ? ? └── youyuan
? ? ? ? ? ? ├── __init__.py
? ? ? ? ? ? ├── items.py
? ? ? ? ? ? ├── middlewares.py
? ? ? ? ? ? ├── pipelines.py
? ? ? ? ? ? ├── settings.py?
? ? ? ? ? ? └── spiders
? ? ? ? ? ? ? ? ├── __init__.py
? ? ? ? ? ? ? ? └── yy.py? ?
1.將Redis數據轉存到mongodb中 ----- vim process_item_for_mongodb.py
#!/usr/bin/env python #coding:utf-8import redis import pymongo import json def process_item():rediscli = redis.Redis(host='200.200.200.200',port=6379,db=0) # 創建Redis連接對象mongocli = pymongo.MongoClient(host='200.200.200.202',port=27017) # 創建MongoDB連接對象db_name = mongocli['youyuan'] # 利用MongoDB連接對象在MongoDB中創建名為youyuan的數據庫對象sheet_name = db_name['beijing18-24'] # 利用該數據庫對象在youyuan數據庫中創建名為beijing18-24的表對象count = 0 while True:# 使用循環通過redis連接對象的blpop()方法,不斷取出redis中的數據(blpop即FIFO,rlpop即FILO)source,data = rediscli.blpop("yy:items") data = json.loads(data) # 將取出的json字符串類型的數據轉化為python類型的對象sheet_name.insert(data) # 利用mongodb的表對象的insert()方法,向表中插入(剛才轉化的python對象)count += 1print "已經成功從redis轉移" + str(count) + "條數據到mongodb"if __name__ == "__main__":process_item()注意:MongoDB中可以自動創建鍵,所以直接執行該腳本就可轉移數據
?
2.將Redis數據轉存到mysql中 ----- vim process_item_for_mysql.py
import redis import MySQLdb import json def process_item():rediscli = redis.Redis(host='200.200.200.200',port=6379,db=0) # 創建Redis連接對象# 創建MySQL連接對象mysqlcli = MySQLdb.connect(host='200.200.200.204',port 3306,db='youyuan',user='zhangsan',password='123456') count = 0while True:# 使用循環通過redis連接對象的blpop()方法,不斷取出redis中的數據(blpop即FIFO,rlpop即FILO)source,data = rediscli.blpop('yy:items') data = json.loads(data) # 將取出的json字符串類型的數據轉化為python類型的對象try:# 利用mysql連接對象創建cursor操作游標,并使用該操作游標向Mysql表中插入數據,數據通過python對象獲取其值cursor = mysqlcli.cursor() cursor.execute("insert into beijing18-24 (username,age,spider,crawled)values(%s,%s,%s,%s)",[data['username'],data['age'],data['spider'],data['crawled']])mysqlcli.commit() # 插入完成后需提交事務cursor.close() # 關閉操作游標count += 1print "已經成功從redis轉移" + str(count) + "條數據到mongodb"except:passif __name__ == "__main__":process_item()注意:MySQL中不能自動創建字段,所以在執行該腳本前,需要自行在數據庫中創建好響應的數據庫/表/字段,然后才能執行該腳本,轉移數據到MySQL中
?
?
?
總結
以上是生活随笔為你收集整理的Scrapy-redis 源码分析 及 框架使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringBoot 自带工具类~Ref
- 下一篇: 安卓逆向_15( 一 ) --- JNI