日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Scrapy 扩展中间件: 同步/异步提交批量 item 到 MySQL

發(fā)布時(shí)間:2025/3/20 数据库 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Scrapy 扩展中间件: 同步/异步提交批量 item 到 MySQL 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

0.參考

https://doc.scrapy.org/en/latest/topics/item-pipeline.html?highlight=mongo#write-items-to-mongodb

20180721新增:異步版本

https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html

https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html

https://twistedmatrix.com/documents/12.1.0/core/howto/time.html

1.主要實(shí)現(xiàn)

(1) 連接超時(shí)自動(dòng)重連 MySQL server

(2) 通過(guò) item_list 收集 item,達(dá)到閾值后批量提交至數(shù)據(jù)庫(kù)

(3) 通過(guò)解析異常,自動(dòng)移除存在異常的數(shù)據(jù)行,重新提交 item_list

(4) shutdown 之前在 close_spider() 中提交當(dāng)前 item_list

(5) 20180721新增:異步版本

2.同步版本

保存至 /site-packages/my_pipelines.py

''' 遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書(shū)! ''' from socket import gethostname import time import re from html import escapeimport pymysql pymysql.install_as_MySQLdb() from pymysql import OperationalError, InterfaceError, DataError, IntegrityErrorclass MyMySQLPipeline(object):hostname = gethostname()def __init__(self, settings):self.mysql_host = settings.get('MYSQL_HOST', '127.0.0.1')self.mysql_port = settings.get('MYSQL_PORT', 3306)self.mysql_user = settings.get('MYSQL_USER', 'username')self.mysql_passwd = settings.get('MYSQL_PASSWD', 'password')self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)self.mysql_db = settings.get('MYSQL_DB')self.mysql_charset = settings.get('MYSQL_CHARSET', 'utf8') #utf8mb4self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)self.item_list = []@classmethoddef from_crawler(cls, crawler):return cls(settings = crawler.settings)def open_spider(self, spider):try:self.conn = pymysql.connect(host = self.mysql_host,port = self.mysql_port,user = self.mysql_user,passwd = self.mysql_passwd,db = self.mysql_db,charset = self.mysql_charset,)except Exception as err:spider.logger.warn('MySQL: FAIL to connect {} {}'.format(err.__class__, err))time.sleep(self.mysql_reconnect_wait)self.open_spider(spider)else:spider.logger.info('MySQL: connected')self.curs = self.conn.cursor(pymysql.cursors.DictCursor)spider.curs = self.cursdef close_spider(self, spider):self.insert_item_list(spider)self.conn.close()spider.logger.info('MySQL: closed')def process_item(self, item, spider):self.item_list.append(item)if len(self.item_list) >= self.mysql_item_list_limit:self.insert_item_list(spider)return itemdef sql(self):raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')def insert_item_list(self, spider):spider.logger.info('insert_item_list: {}'.format(len(self.item_list)))try:self.sql()except (OperationalError, InterfaceError) as err:# <class 'pymysql.err.OperationalError'> # (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')spider.logger.info('MySQL: exception {} err {}'.format(err.__class__, err))self.open_spider(spider)self.insert_item_list(spider)except Exception as err:if len(err.args) == 2 and isinstance(err.args[1], str):# <class 'pymysql.err.DataError'> # (1264, "Out of range value for column 'position_id' at row 2")# <class 'pymysql.err.InternalError'> # (1292, "Incorrect date value: '1977-06-31' for column 'release_day' at row 26")m_row = re.search(r'at\s+row\s+(\d+)$', err.args[1])# <class 'pymysql.err.IntegrityError'> # (1048, "Column 'name' cannot be null") films 43894m_column = re.search(r"Column\s'(.+)'", err.args[1])if m_row:row = m_row.group(1)item = self.item_list.pop(int(row) - 1)spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))self.insert_item_list(spider)elif m_column:column = m_column.group(1)item_list = []for item in self.item_list:if item[column] == None:item_list.append(item)for item in item_list:self.item_list.remove(item)spider.logger.warn('MySQL: {} {} exception from item {}'.format(err.__class__, err, item))self.insert_item_list(spider)else:spider.logger.error('MySQL: {} {} unhandled exception from item_list: \n{}'.format(err.__class__, err, self.item_list))else:spider.logger.error('MySQL: {} {} unhandled exception from item_list: \n{}'.format(err.__class__, err, self.item_list))finally:self.item_list.clear()

3.調(diào)用方法

Scrapy 項(xiàng)目 project_name

MySQL 數(shù)據(jù)庫(kù) database_name, 表 table_name

(1) 項(xiàng)目 pipelines.py 添加代碼:
''' 遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書(shū)! ''' from my_pipelines import MyMySQLPipelineclass MySQLPipeline(MyMySQLPipeline):def sql(self):self.curs.executemany("""INSERT INTO table_name (position_id, crawl_time)VALUES (%(position_id)s, %(crawl_time)s)ON DUPLICATE KEY UPDATEcrawl_time=values(crawl_time)""", self.item_list)self.conn.commit()
(2) 項(xiàng)目 settings.py 添加代碼:
''' 遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書(shū)! ''' # Configure item pipelines # See https://doc.scrapy.org/en/latest/topics/item-pipeline.html ITEM_PIPELINES = {# 'project_name.pipelines.ProxyPipeline': 300,'project_name.pipelines.MySQLPipeline': 301, }MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_USER = 'username' MYSQL_PASSWD ='password' MYSQL_RECONNECT_WAIT = 60MYSQL_DB = 'database_name' MYSQL_CHARSET = 'utf8' #utf8mb4 MYSQL_ITEM_LIST_LIMIT = 3 #100

4.運(yùn)行結(jié)果

自動(dòng)移除存在異常的數(shù)據(jù)行,重新提交 item_list:

2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/> {'position_id': 103, 'crawl_time': '2018-07-18 12:35:52'} 2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/> {'position_id': None, 'crawl_time': '2018-07-18 12:35:52'} 2018-07-18 12:35:52 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/> {'position_id': 104, 'crawl_time': '2018-02-31 17:51:47'} 2018-07-18 12:35:55 [scrapy.core.engine] DEBUG: Crawled (200) <GET http://httpbin.org/> (referer: http://httpbin.org/) 2018-07-18 12:35:55 [test] INFO: insert_item_list: 3 2018-07-18 12:35:55 [test] WARNING: MySQL: <class 'pymysql.err.IntegrityError'> (1048, "Column 'position_id' cannot be null") exception from item {'position_id': None, 'crawl_time': '2018-07-18 12:35:52'} 2018-07-18 12:35:55 [test] INFO: insert_item_list: 2 2018-07-18 12:35:55 [test] WARNING: MySQL: <class 'pymysql.err.InternalError'> (1292, "Incorrect datetime value: '2018-02-31 17:51:47' for column 'crawl_time' at row 1") exception from item {'position_id': 104, 'crawl_time': '2018-02-31 17:51:47'} 2018-07-18 12:35:55 [test] INFO: insert_item_list: 1 2018-07-18 12:35:55 [scrapy.core.scraper] DEBUG: Scraped from <200 http://httpbin.org/>

提交結(jié)果:

在 self.item_list.append(item) 之后 添加代碼 spider.logger.info(‘process_item: {}’.format(len(self.item_list))) 打印添加 item 后的當(dāng)前 item_list 元素個(gè)數(shù)

連續(xù) yield 5個(gè) item,累計(jì)3個(gè)則觸發(fā) insert,紅框 insert 部分將會(huì)阻塞綠框中后續(xù) yield 部分:

5.異步版本

(1) 保存至 /site-packages/my_pipelines.py
''' 遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書(shū)! ''' # -*- coding: utf-8 -*- from socket import gethostname import time import re# https://twistedmatrix.com/documents/15.3.0/core/howto/rdbms.html # twisted.enterprise.adbapi: Twisted RDBMS support from twisted.enterprise import adbapi import pymysql from pymysql import OperationalError, InterfaceError, DataError, InternalError, IntegrityErrorclass MyMySQLPipeline(object):hostname = gethostname()def __init__(self, spider, settings):self.spider = spiderself.dbpool = adbapi.ConnectionPool('pymysql',host = settings.get('MYSQL_HOST', '127.0.0.1'),port = settings.get('MYSQL_PORT', 3306),user = settings.get('MYSQL_USER', 'username'),passwd = settings.get('MYSQL_PASSWD', 'password'),db = settings.get('MYSQL_DB', 'test'),charset = settings.get('MYSQL_CHARSET', 'utf8'), #utf8mb4cursorclass = pymysql.cursors.DictCursor)self.mysql_reconnect_wait = settings.get('MYSQL_RECONNECT_WAIT', 60)self.mysql_item_list_limit = settings.get('MYSQL_ITEM_LIST_LIMIT', 30)self.item_list = []@classmethoddef from_crawler(cls, crawler):return cls(spider = crawler.spider,settings = crawler.settings)def close_spider(self, spider):self._sql(list(self.item_list))def process_item(self, item, spider):self.item_list.append(item)if len(self.item_list) >= self.mysql_item_list_limit:spider.log('item_list: %s'%len(self.item_list))self._sql(list(self.item_list))self.item_list.clear()return itemdef sql(self, txn, item_list):raise NotImplementedError('Subclass of MyMySQLPipeline must implement the sql() method')def _sql(self, item_list, retrying=False):d = self.dbpool.runInteraction(self.sql, item_list)d.addCallback(self.handle_result, item_list)d.addErrback(self.handle_error, item_list, retrying)def handle_result(self, result, item_list):self.spider.logger.info('{} items inserted with retcode {}'.format(len(item_list), result))def handle_error(self, failure, item_list, retrying):# https://twistedmatrix.com/documents/18.7.0/api/twisted.python.failure.Failure.html# r = failure.trap(pymysql.err.InternalError)args = failure.value.args# <class 'pymysql.err.OperationalError'> (1045, "Access denied for user 'username'@'localhost' (using password: YES)")# <class 'pymysql.err.OperationalError'> (2013, 'Lost connection to MySQL server during query ([Errno 110] Connection timed out)')# <class 'pymysql.err.OperationalError'> (2003, "Can't connect to MySQL server on '127.0.0.1' ([WinError 10061] 由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接。)")# <class 'pymysql.err.InterfaceError'> (0, '') # after crawl started: sudo service mysqld stopif failure.type in [OperationalError, InterfaceError]:if not retrying:self.spider.logger.info('MySQL: exception {} {} \n{}'.format(failure.type, args, item_list)) self.spider.logger.debug('MySQL: Trying to recommit in %s sec'%self.mysql_reconnect_wait)# self._sql(item_list)# https://twistedmatrix.com/documents/12.1.0/core/howto/time.htmlfrom twisted.internet import taskfrom twisted.internet import reactortask.deferLater(reactor, self.mysql_reconnect_wait, self._sql, item_list, True)else:self.spider.logger.warn('MySQL: exception {} {} \n{}'.format(failure.type, args, item_list))return# <class 'pymysql.err.DataError'> (1264, "Out of range value for column 'position_id' at row 2")# <class 'pymysql.err.InternalError'> (1292, "Incorrect date value: '1977-06-31' for column 'release_day' at row 26")elif failure.type in [DataError, InternalError]:m_row = re.search(r'at\s+row\s+(\d+)$', args[1])row = m_row.group(1)item = item_list.pop(int(row) - 1)self.spider.logger.warn('MySQL: {} {} exception from item {}'.format(failure.type, args, item))self._sql(item_list)return# <class 'pymysql.err.IntegrityError'> (1048, "Column 'name' cannot be null") films 43894elif failure.type in [IntegrityError]: m_column = re.search(r"Column\s'(.+)'", args[1])column = m_column.group(1)some_items = [item for item in item_list if item[column] is None]self.spider.logger.warn('MySQL: {} {} exception from some items: \n{}'.format(failure.type, args, some_items))self._sql([item for item in item_list if item[column] is not None])returnelse:self.spider.logger.error('MySQL: {} {} unhandled exception from item_list: \n{}'.format(failure.type, args, item_list))return
(2) 項(xiàng)目 pipelines.py 添加代碼:注意 dbpool.runInteraction 是自動(dòng)提交的 transaction
''' 遇到問(wèn)題沒(méi)人解答?小編創(chuàng)建了一個(gè)Python學(xué)習(xí)交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯(cuò)的視頻學(xué)習(xí)教程和PDF電子書(shū)! ''' from my_pipelines import MyMySQLPipelineclass MySQLPipeline(MyMySQLPipeline):def sql(self, txn, item_list):return txn.executemany("""INSERT INTO table_name (position_id, crawl_time)VALUES (%(position_id)s, %(crawl_time)s)ON DUPLICATE KEY UPDATEcrawl_time=values(crawl_time)""", item_list)
(3) 項(xiàng)目 settings.py

見(jiàn)上文同步版本 3(1)

(4) 運(yùn)行結(jié)果

在 self.item_list.append(item) 之后 添加代碼 spider.logger.info(‘process_item: {}’.format(len(self.item_list))) 打印添加 item 后的當(dāng)前 item_list 元素個(gè)數(shù)

連續(xù) yield 5個(gè) item,累計(jì)3個(gè)則觸發(fā) insert,紅框 insert 部分并不會(huì)阻塞綠框中后續(xù) yield 部分:

另外可見(jiàn)使用了連接池

與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的Scrapy 扩展中间件: 同步/异步提交批量 item 到 MySQL的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。