日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python 多线程 全站小说_多线程下载小说

發布時間:2025/3/21 python 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 多线程 全站小说_多线程下载小说 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

引言

不知道說啥子, 直接進入正題,哦,本人實測,一本一萬多章的小說大概四分鐘能爬完,在此想說(小說)網站牛逼,不像學校的教務系統,說多了都是淚

多線程

爬取屬于io密集型的,所以引入多線程

redis

用redis分發任務以及數據暫存,所以想用的得去下個redis,嘻嘻嘻,其實之前還用了mongodb,為了減少麻煩,就縮減為redis了

redis windows安裝 需要開著redis-server,到了啟動redis-server那步即可停止

代理

質量不錯就用,免費的去一邊(自己試了些免費,不咋行)

代碼

'''

安裝redis 開啟server

采用redis 數據太多好像一次取不出來 試了一個一萬多章的,一次只取出了9000多章, 現在分批次取

所以大概需要的值有兩個

1. 所需要爬取小說的目錄頁面的鏈接

2. 第一章的開頭位置, 因為有些可能前六章是最新章節 默認:0

3.有問題再聯系,小說網站總體差不多(網頁排版)應該能適用幾個網站吧吧吧吧

'''

import random

import redis

# import pymongo

import requests

import json

import logging

import threading

from bs4 import BeautifulSoup

from concurrent.futures import ThreadPoolExecutor

# from Proxyer import Proxyer

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")

class BookDownload:

def __init__(self, chapter_link, start_place=0, abbr='http://', proxy=None):

self.chapter_link = chapter_link # 目錄頁面的鏈接

self.start_place = start_place # 第一章在目錄中的位置

self.abbr_link = chapter_link # 前綴 鏈接的前一部分 做了處理 不需要再傳了

self.redis_client = redis.Redis()

self.event = threading.Event()

self.redis_list = 'url_info'

self.redis_failed_list = 'failed_url'

self.redis_cache = 'download'

self._proxy = proxy

# self.mongo_collect = pymongo.MongoClient().chapter_3.test3 # mongodb,自己設置

self.all_chapter = 0

self.successed_download = 0

self.session = requests.session()

self.header = {

'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36'}

@property

def proxy(self):

# 代理質量好就用代理 免費代理不推薦(本機就行) 測試的這個網站不咋封ip

# 要求傳遞list or None

if self._proxy:

choosed_proxy = random.choice(self._proxy)

return {'http': 'http:' + choosed_proxy}

return None

@proxy.setter

def proxy(self, value):

if isinstance(value, list) or value is None:

self._proxy = value

else:

raise ValueError('must be list type or None')

def get_all_chapter(self):

res = self.session.get(self.chapter_link, headers=self.header, timeout=5)

if res.status_code != 200:

raise Exception("can't access the website")

soup = BeautifulSoup(res.content.decode(), 'lxml')

name_list = soup.find('div', attrs={'id': 'list'})

dl_list = name_list.find('dl')

wanted_download = dl_list.find_all('dd')[self.start_place:]

self.all_chapter = len(wanted_download)

for order, value in enumerate(wanted_download):

yield order, value.a.get('href').rsplit('/')[-1], value.a.text

def store_name_in_redis(self):

"""

直接調用get_all_chapter

:return:

"""

for info in self.get_all_chapter():

try:

self.redis_client.rpush(self.redis_list, json.dumps(info))

except Exception as e:

logging.info(e)

def requests_one_link(self, detail_link, timeout):

"""

直接解析了

:param detail_link:

:return: 正文內容

"""

try:

res = self.session.get(detail_link, proxies=self.proxy, headers=self.header, timeout=timeout)

text = res.content.decode()

soup = BeautifulSoup(text, 'lxml')

zhengwen = soup.find('div', attrs={'id': 'content'}).text.replace(r'
', '\n')

return zhengwen

except Exception as e:

# raise e

return None

def _clear_redis(self):

"""

清除redis

:return:

"""

try:

self.redis_client.delete(self.redis_list)

self.redis_client.delete(self.redis_failed_list)

self.redis_client.delete(self.redis_cache)

# self.redis_client.lpop(self.redis_cache)

except Exception as e:

pass

def init_work(self):

self._clear_redis()

def get_url(self, name):

"""

從redis中獲取url信息

:return:

"""

burl_info = self.redis_client.lpop(name)

if burl_info:

url_info = json.loads(burl_info)

order, after_link, name = url_info

return order, after_link, name

return None

def handle(self, order, after_link, name, timeout=2):

"""

成功不管,失敗返回信息扔進failed隊列

:param order:

:param after_link:

:param name:

:return:

"""

content = self.requests_one_link(self.abbr_link + after_link, timeout)

if content:

keys = name + '\n' + content

self.redis_client.zadd(self.redis_cache, {keys: order})

logging.info('sucess download {}'.format(name))

self.successed_download += 1

# self.mongo_collect.insert_one({'order': order, 'name': name, 'content': content})

# logging.info('sucess download {}'.format(name))

# self.successed_download += 1

return None

else:

logging.info('failed to download {}'.format(name))

return order, after_link, name

def _callback(self, futures):

"""

回調函數

:param futures:

:return:

"""

res = futures.result()

if res:

try:

self.redis_client.rpush(self.redis_failed_list, json.dumps(res))

except Exception as e:

logging.info(e)

def start_download(self, Pool: ThreadPoolExecutor):

while True:

info = self.get_url(self.redis_list)

if info:

futures = Pool.submit(self.handle, *info)

futures.add_done_callback(self._callback)

else:

break

self.event.set()

Pool.shutdown()

def failed_download(self):

"""

對第一次失敗的進行下載

最多嘗試三次

:return:

"""

if self.event.wait():

while True:

info = self.get_url(self.redis_failed_list)

if info:

try_times = 3

while try_times:

if not self.handle(*info, timeout=3):

break

try_times -= 1

else:

break

logging.info("=============end download==============")

logging.info("===all chapter {}=== success download {}=====".format(self.all_chapter, self.successed_download))

def start_failed_download(self):

thread = threading.Thread(target=self.failed_download)

thread.start()

thread.join()

def store_txt(self):

txt = 'download.txt'

# count = self.redis_client.zcard(self.redis_cache)

while self.redis_client.zcard(self.redis_cache):

content = ''

for x in self.redis_client.zrange(self.redis_cache, 0, 1000):

content += x.decode() + '\n'

with open(txt, 'a+',encoding='utf8') as f:

f.write(content)

self.redis_client.zremrangebyrank(self.redis_cache, 0, 1000)

if __name__ == '__main__':

Pool = ThreadPoolExecutor(15)

bookdownload = BookDownload('http://www.xbiquge.la/54/54101/', 0)

bookdownload.init_work()

bookdownload.store_name_in_redis()

logging.info('=======================start================================')

bookdownload.start_download(Pool)

bookdownload.start_failed_download()

bookdownload.store_txt()

###測試結果

原文鏈接:https://blog.csdn.net/qq_45667109/article/details/106041255

總結

以上是生活随笔為你收集整理的python 多线程 全站小说_多线程下载小说的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。