日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何构建一个自己的代理ip池

發布時間:2023/12/10 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何构建一个自己的代理ip池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、默認自動切換IP

登錄線程IP池客戶端時,默認情況下會自動切換IP。 如果不想自動切換IP,或者還沒有準備開始使用,請在客戶端右側將“在IP過期前幾秒自動申請切換”設置為“0”。 0無效。

二.默認情況下不需要授權

默認情況下,線程IP池可以使用代理IP,而無需驗證。 如果需要在特定情況下允許使用,請選中“訪問代理需要驗證”。 用戶名和密碼是用于登錄線程IP池客戶端的“TID”和“密碼”,帳戶驗證允許使用。

三.退出軟件前取消代理

許多用戶退出線程IP池客戶端后,發現瀏覽器無法訪問網站。 這是因為,以前在軟件中一鍵設置瀏覽器代理IP,軟件結束后,無法繼續傳輸,因此無法訪問網站。

在退出線程IP池軟件之前,最好取消代理。 如果忘記,請選擇瀏覽器右上角的“工具”——“互聯網選項”——“連接”——“局域網設置”(雖然因瀏覽器而異,但方法大致相同)、“為局域網使用代理服務器”

1、爬取免費代理IP,搭建動態IP池

市面上有不少免費的代理IP服務,使用免費代理搭建動態IP池的方法非常常見,也是比較多人使用的一種。因為它是免費的,也就意味著無需成本,所以大多數人都因為“免費”二字而趨之若鶩。但是這種方法操作相對復雜一些,網上也有不少的教程分享,如果您感興趣的話可以上網查找相關的項目,這邊就不詳細敘述如何爬取搭建了。

2、購買撥號服務器,搭建動態IP池

購買撥號服務器來搭建動態IP池也是很多朋友的選擇。購買一定數量的服務器,然后花費一些時間來編寫代碼,或者結合網上的現成的軟件,就可以將代理動態IP的池搭建起來了。這種方法搭建出來的IP池資源都是一個人獨享的,工作起來效果還是不錯的。只不過長期下來的話,服務器的維護成本較高,并且需要定時的維護,消耗大量的時間,如果是個人的話,搭建起來后期維護的成本太高了,如果您不是高端玩家的話,不建議使用這種方式搭建IP池。

3、購買代理IP,搭建動態IP池

選擇購買代理IP服務的朋友很多,因為代理IP服務省時省力并且效果比較好。相對于免費的代理IP來說,收費代理IP雖然需要付出一定的成本,但是IP資源都是真實IP,并且高匿性,穩定性也好。

相比前兩種搭建IP池的方法來說,付費代理IP更能滿足用戶的需求,但對于有些特殊要求的朋友來說,他們想一次提取很多個或者多次提取很多個,存放在本地建立的IP池里,這種方法在一定的程度上優化了方案。其中,像Lum Proxy這樣的動態IP代理服務就深受大家的歡迎。Lum Proxy是國內領先的動態IP服務商,有著全球每一個國家和城市的9000多萬IP。在每個國家每個大城市基本都涵蓋,而且會不定時更換一些IP段。100%匿名高速代理,能夠有效地幫助大家更好地工作。

代理IP池管理代理的集合及其相關IP地址。IP池中的代理類型決定了代理IP池的類型。一般情況下,根據代理ip池包含的代理類型,代理ip池有5種類型。 1、免費代理IP池。該免費代理IP池也稱為公共IP池,它是管理免費代理的列表。IP一般是數據中心IP,但是在一些公共IP池中,你可以使用混合的住宅IP。可靠度,公共IP是最不可靠的代理池。 由于很容易檢測到公共代理服務器(也稱為免費代理服務器),一些網站會默認阻止它,并在你不知情的情況下泄露你的真實IP地址。它們通常不需要任何形式的認證,直到被使用。 在Internet上,如果使用免費代理列表站點,則很容易建立公共IP池。為了創建一個公共IP池,您需要在Internet上使用一個刮板為您獲取免費代理,一個代理檢查器來確保該池只包含有效的代理,以及IP循環系統等等。然而,你需要知道,使用公共IP池來進行任何合理的在線活動都不是個好主意——你會受到壞鄰居、垃圾郵件和它們不可靠的特性的影響。2、數據中心代理IP池。數據中心代理IP池是只包含數據中心IP的代理池。DataCenterIP是數據中心擁有和管理的IP。DataCenterBroker池具有與數據中心代理相關的所有優點,但也具有與數據中心代理相關的所有缺點。在數據中心代理池中的代理存在許多障礙,因為它們易于檢測。

對于爬蟲來說,當你的訪問頻率達到了目標網站的預警值時,就可能觸發目標網站的反爬機制。而封禁訪問者ip就是很常見的一個反爬機制。

當ip被封禁后,從此ip發出的請求將不能得到正確的響應。這種時候,我們就需要一個代理ip池。

什么是代理ip池?

通俗地比喻一下,它就是一個池子,里面裝了很多代理ip。它有如下的行為特征:

1.池子里的ip是有生命周期的,它們將被定期驗證,其中失效的將被從池子里面剔除。
2.池子里的ip是有補充渠道的,會有新的代理ip不斷被加入池子中。
3.池子中的代理ip是可以被隨機取出的。
這樣,代理池中始終有多個不斷更換的、有效的代理ip,且我們可以隨機從池子中取出代理ip,然后讓爬蟲程序使用代理ip訪問目標網站,就可以避免爬蟲被ban的情況。

今天,我們就來說一下如何構建自己的代理ip池。而且,我們要做一個比較靈活的代理池,它提供兩種代理方式:

1.每次都通過http接口提取一個隨機代理ip,然后在爬蟲中使用此代理ip(大部分代理ip池服務都是這種形式)
2.使用squid3代理做請求轉發,爬蟲設置好squid3代理的地址,每次請求將由squid3自動轉發給代理池中的代理
項目已經放到了github上,不想看原理、只想應用的可以直接移步github:open_proxy_pool

地址:https://github.com/AaronJny/open_proxy_pool

原理請往下看。

轉載請注明出處:https://blog.csdn.net/aaronjny/article/details/87865942

代理池結構
代理池的組件可以大致描述如下:

1.代理IP的獲取/補充渠道,定期把獲取到的代理ip加入到代理池中
2.代理ip的驗證機制,定期驗證代理池中ip的有效性,并刪除掉所有失效的ip
3.一個web服務,用以提供獲取一個隨機代理的api
4.squid3的維持腳本,它定期獲取代理池中的可用ip,更新squid中的可轉發代理列表
5.一個調度器,程序的入口,用來協調各組件的運行
如果不是很理解,沒關系,請往下看,我會細說。

環境說明
為了實現代理IP池,我們如下的軟件環境(列舉主要部分):

1.redis服務器,用以存放代理池相關數據
2.flask,用以實現提取單個隨機代理的api
3.squid3,用以實現代理轉發
組件1-獲取代理ip的渠道
我們有很多種渠道獲取代理ip。籠統一點來說,可以分為兩類,免費代理和收費代理。

免費代理,顧名思義嘛,最大的優點就是免費,不需要什么成本,網上搜一下就能找到。缺點也很明顯,免費代理畢竟是免費的,所以質量根本不能保證,大部分無法使用,能用的多數也速度奇慢。

收費代理的質量相對來說就好多了,不同平臺的代理質量和價格上都有些出入,可以自行比較。

個人學習的話,如果真的資金優先,可以考慮采集免費代理;如果資金相對充裕,可以花錢買一天或一周的代理使用,價格也不貴。我是比較推薦收費代理的,因為免費代理的質量真的不敢恭維。

企業商用的話,優先考慮收費代理吧,會穩定很多。

我選擇的代理服務商是站大爺(http://ip.zdaye.com/),聲明一下,我真的沒收廣告費啊= =。坦言說,站大爺的代理質量只能算一般,不過也夠用。有幾家的質量比它要好一些,不過好的有限。讓我選擇站大爺的最大原因是,它支持賬號密碼訪問的模式。

沒用過收費代理的朋友可能不清楚,使用收費代理平臺的接口,從平臺上批量提取代理ip或使用代理時,一般都是要綁定你的機器ip的。比如,你的機器ip是123.123.123.123,你就需要事先在平臺上把ip綁定為123.123.123.123,這樣,你只能通過IP為123.123.123.123的機器從平臺提取ip,提取出的ip也只能由ip為123.123.123.123的機器使用,其他ip的機器都不行。當我們有多臺機器的時候,就會非常尷尬了,畢竟不能給每臺機器都買一次代理吧,很不劃算。

在站大爺上面,除了綁定ip這個方法外,還可以選擇使用賬號+密碼提取/使用代理,選擇這個方法的話就不再收到IP地址的限制。講道理,有點舒服啊= =

我先面的編碼以站大爺為例,使用其他代理服務的可自行編寫相關腳本,原理和邏輯都是相通的,部分細節上針對處理即可。

購買的細節我也不說了,如果需要購買的話,直接去官網購入短效優質代理即可。

先放出這部分的完整代碼,附有注釋。

-- coding: utf-8 --

@File : get_ip.py

@Author: AaronJny

@Date : 18-12-14 上午10:44

@Desc : 從指定網站上獲取代理ip,

我目前在使用站大爺,就以站大爺為例

import requests
import time
import utils
import settings
from gevent.pool import Pool
from gevent import monkey
monkey.patch_all()
class ZdyIpGetter:
“”" 從站大爺上提取代理ip的腳本,使用其他代理服務的可自行編寫相關腳本, 原理和邏輯都是相通的,部分細節上需要針對處理 “”"
def init(self):
# 購買服務時,網站給出的提取ip的api,替換成自己的
self.api_url = ‘http://xxxxxxxxxxxxxxxxxxxxxxxxxx’
self.logger = utils.get_logger(getattr(self.class, ‘name’))
self.proxy_list = []
self.good_proxy_list = []
self.pool = Pool(5)
self.server = utils.get_redis_client()
def check_proxy(self, proxy):
“”" 檢查代理是否可用, 并將可用代理加入到指定列表中 :param proxy: :return: “”"
if settings.USE_PASSWORD:
tmp_proxy = ‘{}:{}@{}’.format(settings.USERNAME, settings.PASSWORD, proxy)
else:
tmp_proxy = ‘{}’.format(proxy)
proxies = {
‘http’: ‘http://’ + tmp_proxy,
‘https’: ‘https://’ + tmp_proxy,
}
try:
# 驗證代理是否可用時,訪問的是ip138的服務
resp = requests.get(‘http://2019.ip138.com/ic.asp’, proxies=proxies, timeout=10)
# self.logger.info(resp.content.decode(‘gb2312’))
# 判斷是否成功使用代理ip進行訪問
assert proxy.split(‘:’)[0] in resp.content.decode(‘gb2312’)
self.logger.info(‘[GOOD] - {}’.format(proxy))
self.good_proxy_list.append(proxy)
except Exception as e:
self.logger.info(‘[BAD] - {} , {}’.format(proxy, e.args))
def get_proxy_list(self):
“”" 提取一批ip,篩選出可用的部分 注:當可用ip小于兩個時,則保留全部ip(不論測試成功與否) :return: “”"
while True:
try:
res = requests.get(self.api_url, timeout=10).content.decode(‘utf8’)
break
except Exception as e:
self.logger.error(‘獲取代理列表失敗!重試!{}’.format(e))
time.sleep(1)
if len(res) == 0:
self.logger.error(‘未獲取到數據!’)
elif ‘bad’ in res:
self.logger.error(‘請求失敗!’)
# 檢測未考慮到的異常情況
elif res.count(‘.’) != 15:
self.logger.error(res)
else:
self.logger.info(‘開始讀取代理列表!’)
for line in res.split():
if ‘:’ in line:
self.proxy_list.append(line.strip())
self.pool.map(self.check_proxy, self.proxy_list)
self.pool.join()
# 當本次檢測可用代理數量小于2個時,則認為檢測失敗,代理全部可用
if len(self.good_proxy_list) < 2:
self.good_proxy_list = self.proxy_list.copy()
self.logger.info(‘>>>> 完成! <<<<’)
def save_to_redis(self):
“”" 將提取到的有效ip保存到redis中, 供其他組件訪問 :return: “”"
for proxy in self.good_proxy_list:
self.server.zadd(settings.IP_POOL_KEY, int(time.time()) + settings.PROXY_IP_TTL, proxy)
def fetch_new_ip(self):
“”" 獲取一次新ip的整體流程控制 :return: “”"
self.proxy_list.clear()
self.good_proxy_list.clear()
self.get_proxy_list()
self.save_to_redis()
def main(self):
“”" 周期獲取新ip :return: “”"
start = time.time()
while True:
# 每 settings.FETCH_INTERVAL 秒獲取一批新IP
if time.time() - start >= settings.FETCH_INTERVAL:
self.fetch_new_ip()
start = time.time()
time.sleep(2)
if name == ‘main’:
ZdyIpGetter().main()
說一下這里面的關鍵部分:

1.如何保存代理池相關數據?

從平臺上獲取的ip是有生命周期的,一般幾分鐘后就會失效,所以我們需要用類似于字典的形式保存代理IP和它的過期時間
為了更好地容錯,我們將從平臺上提取到的ip的生命周期統一設置為settings.PROXY_IP_TTL,而代理的可用時間一般是大于settings.PROXY_IP_TTL(我默認設置的是60s)。
為了保證處理效率,實際使用的redis數據結構并非散列表(類似于python中的字典),而是zset(有序集合,可以為集合里面的每個元素設置一個分數,并能夠分數來篩選區間內的元素)。這里,代理ip是zset中的元素,過期時間是元素的分數,參考上面的save_to_redis(self)代碼。
2.如何驗證提取到的ip是否可用?

提取到的ip有些可能是不能用的,所以我先進行了驗證,再將有效的加入到代理池中
使用ip138的接口驗證代理是否生效
校驗之后,如果可用的ip非常少或全部失敗,我傾向于認為是檢驗手段出了問題,并認為此批ip均為正常的,加入到代理池中
組件2-檢驗并清理過期ip
因為我給每個加入代理池的ip都設置了過期時間,所以檢查代理ip是否有效這個操作,也并非真的去檢驗ip本身,而是檢查它的過期時間。

我們需要清除掉過期時間<當前時間的ip,而zset可以快速實現此操作。

-- coding: utf-8 --

@File : delele_ip.py

@Author: AaronJny

@Date : 18-12-14 上午11:15

@Desc : 過期ip清理器

import utils
import settings
import time
class ExpireIpCleaner:
def init(self):
self.logger = utils.get_logger(getattr(self.class, ‘name’))
self.server = utils.get_redis_client()
def clean(self):
“”" 清理代理池中的過期ip :return: “”"
self.logger.info(‘開始清理過期ip’)
# 計算清理前代理池的大小
total_before = int(self.server.zcard(settings.IP_POOL_KEY))
# 清理
self.server.zremrangebyscore(settings.IP_POOL_KEY, 0, int(time.time()))
# 計算清理后代理池的大小
total_after = int(self.server.zcard(settings.IP_POOL_KEY))
self.logger.info(‘完畢!清理前可用ip {},清理后可用ip {}’.format(total_before, total_after))
def main(self):
“”" 周期性的清理過期ip :return: “”"
while True:
self.clean()
self.logger.info(‘*’ * 40)
time.sleep(settings.CLEAN_INTERVAL)
if name == ‘main’:
ExpireIpCleaner().main()
定期進行檢測和清理,很簡單,沒有什么需要說的。

組件3-獲取隨機ip的web接口
不得不說,使用flask開發簡單的接口真的是太舒服了,簡潔而快速。這個web服務提供兩個小功能:

1.獲取一個隨機的可用代理ip
2.查看當前代理池中可用的代理ip的數量

– coding: utf-8 –
@File : web_api.py
@Author: AaronJny
@Date : 18-12-14 上午11:22
@Desc : 提供http接口的web程序
import utils
import settings
import flask
import random
import time
redis_client = utils.get_redis_client()
ip_pool_key = settings.IP_POOL_KEY
app = flask.Flask(name)
@app.route(‘/random/’)
def random_ip():
“”" 獲取一個隨機ip :return: “”"
# 獲取redis中仍可用的全部ip
proxy_ips = redis_client.zrangebyscore(ip_pool_key, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
if proxy_ips:
ip = random.choice(proxy_ips)
# 如果ip需要密碼訪問,則添加
if settings.USE_PASSWORD:
ip = ‘{}:{}@{}’.format(settings.USERNAME, settings.PASSWORD, ip.decode(‘utf8’))
return ip
else:
return ‘’
@app.route(‘/total/’)
def total_ip():
“”" 統計池中可用代理的數量 :return: “”"
total = redis_client.zcard(ip_pool_key)
if total:
return str(total)
else:
return ‘0’
def main():
“”" 程序運行入口 :return: “”"
app.run(‘0.0.0.0’, port=settings.API_WEB_PORT)
if name == ‘main’:
app.run(‘0.0.0.0’, port=settings.API_WEB_PORT)
都很簡單,就不細說了。

組件4-squid的維持、更新腳本
處理http的接口外,我們還可以使用squid做代理轉發,這樣,在爬蟲程序中就不需要再頻繁地更換代理IP地址,直接填上squid的地址,它會自動幫你轉發給其他代理ip。

這個腳本提供如下功能:

1.從代理池中讀取所有可用代理ip,作為可轉發的代理列表寫入到squid的配置文件中,并通過命令使squid重新加載配置文件。這樣,squid一直使用最新可用的那些代理ip。
2.當squid服務異常時,通過命令殺死所有squid進程,并重新開啟,保證服務正常運行
下面的代碼中使用了名為squid.conf的文件,此文件在github上,是關于squid的一些配置。如果需要對squid進行深度定制,需要自行修改這個文件。

-- coding: utf-8 --

@File : squid_keeper.py

@Author: AaronJny

@Date : 18-12-14 上午11:27

@Desc : 維持squid3使用可用ip的腳本

import utils
import settings
import time
import os
import subprocess
class SquidKeeper:
def init(self):
self.logger = utils.get_logger(getattr(self.class, ‘name’))
self.server = utils.get_redis_client()
self.ip_pool_key = settings.IP_POOL_KEY
# 區別對待使用密碼和不使用密碼的配置模板
if settings.USE_PASSWORD:
self.peer_conf = “cache_peer %s parent %s 0 no-query proxy-only login={}:{} never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n”.format(
settings.USERNAME, settings.PASSWORD)
else:
self.peer_conf = “cache_peer %s parent %s 0 no-query proxy-only never_direct allow all round-robin weight=1 connect-fail-limit=2 allow-miss max-conn=5\n”
def read_new_ip(self):
“”" 從redis中讀取全部有效ip :return: “”"
self.logger.info(‘讀取代理池中可用ip’)
proxy_ips = self.server.zrangebyscore(settings.IP_POOL_KEY, int(time.time()),
int(time.time()) + settings.PROXY_IP_TTL * 10)
return proxy_ips
def update_conf(self, proxy_list):
“”" 根據讀取到的代理ip,和現有配置文件模板, 生成新的squid配置文件并重新加載,讓squid使用最新的ip。 :param proxy_list: :return: “”"
self.logger.info(‘準備加載到squid中’)
with open(‘squid.conf’, ‘r’) as f:
squid_conf = f.readlines()
squid_conf.append(‘\n# Cache peer config\n’)
for proxy in proxy_list:
ip, port = proxy.decode(‘utf8’).split(‘:’)
squid_conf.append(self.peer_conf % (ip, port))
with open(‘/etc/squid/squid.conf’, ‘w’) as f:
f.writelines(squid_conf)
failed = os.system(‘squid -k reconfigure’)
# 這是一個容錯措施
# 當重新加載配置文件失敗時,會殺死全部相關進行并重試
if failed:
self.logger.info(‘squid進程出現問題,查找當前啟動的squid相關進程…’)
p = subprocess.Popen(“ps -ef | grep squid | grep -v grep | awk ‘{print $2}’”, shell=True,
stdout=subprocess.PIPE, universal_newlines=True)
p.wait()
result_lines = [int(x.strip()) for x in p.stdout.readlines()]
self.logger.info(‘找到如下進程:{}’.format(result_lines))
if len(result_lines):
for proc_id in result_lines:
self.logger.info(‘開始殺死進程 {}…’.format(proc_id))
os.system(‘kill -s 9 {}’.format(proc_id))
self.logger.info(‘全部squid已被殺死,開啟新squid進程…’)
os.system(‘service squid restart’)
time.sleep(3)
self.logger.info(‘重新加載ip…’)
os.system(‘squid -k reconfigure’)
self.logger.info(‘當前可用IP數量 {}’.format(len(proxy_list)))
def main(self):
“”" 周期性地更新squid的配置文件, 使其使用最新的代理ip :return: “”"
while True:
proxy_list = self.read_new_ip()
self.update_conf(proxy_list)
self.logger.info(‘*’ * 40)
time.sleep(settings.SQUID_KEEPER_INTERVAL)
if name == ‘main’:
SquidKeeper().main()
組件5-調度器
調度器是程序的入口,也是對以上各個組件的控制和整合。

它的主要功能是:

1.使用子進程分別開啟各個組件
2.在某個組件異常退出后,重啟它
3.接收到終止信號時,關閉所有存活的組件進程后再退出

– coding: utf-8 –
@File : scheduler.py
@Author: AaronJny
@Date : 18-12-14 上午11:41
@Desc : 調度中心,所有組件在這里被統一啟動和調度
import utils
import settings
from get_ip import ZdyIpGetter
from delele_ip import ExpireIpCleaner
from web_api import app
from squid_keeper import SquidKeeper
from multiprocessing import Process
import time

class Scheduler:
logger = utils.get_logger(‘Scheduler’)
@staticmethod
def fetch_ip():
“”"
獲取新ip的進程
:return:
“”"
while True:
try:
ZdyIpGetter().main()
except Exception as e:
print(e.args)
@staticmethod
def clean_ip():
“”"
定期清理過期ip的進程
:return:
“”"
while True:
try:
ExpireIpCleaner().main()
except Exception as e:
print(e.args)
@staticmethod
def squid_keep():
“”"
維持squid使用最新ip的進程
:return:
“”"
while True:
try:
SquidKeeper().main()
except Exception as e:
print(e.args)
@staticmethod
def api():
“”"
提供web接口的進程
:return:
“”"
app.run(‘0.0.0.0’, settings.API_WEB_PORT)
def run(self):
process_list = []
try:
# 只啟動打開了開關的組件
if settings.IP_GETTER_OPENED:
# 創建進程對象
fetch_ip_process = Process(target=Scheduler.fetch_ip)
# 并將組件進程加入到列表中,方便在手動退出的時候殺死
process_list.append(fetch_ip_process)
# 開啟進程
fetch_ip_process.start()
if settings.EXPIRE_IP_CLEANER_OPENED:
clean_ip_process = Process(target=Scheduler.clean_ip)
process_list.append(clean_ip_process)
clean_ip_process.start()
if settings.SQUID_KEEPER_OPENED:
squid_keep_process = Process(target=Scheduler.squid_keep)
process_list.append(squid_keep_process)
squid_keep_process.start()
if settings.WEB_API_OPENED:
api_process = Process(target=Scheduler.api)
process_list.append(api_process)
api_process.start()
# 一直執行,直到收到終止信號
while True:
time.sleep(1)
except KeyboardInterrupt:
# 收到終止信號時,關閉所有進程后再退出
self.logger.info(‘收到終止信號,正在關閉所有進程…’)
for process in process_list:
if process.is_alive():
process.terminate()
self.logger.info(‘關閉完成!結束程序!’)
if name == ‘main’:
Scheduler().run()
公用方法和配置
將各組件公用的方法和配置抽取出來,做了集中。

-- coding: utf-8 --

@File : utils.py

@Author: AaronJny

@Date : 18-12-14 上午11:07

@Desc :

from redis import StrictRedis, ConnectionPool
import settings
import logging
def get_redis_client():
“”" 獲取一個redis連接 :return: “”"
server_url = settings.REDIS_SERVER_URL
return StrictRedis(connection_pool=ConnectionPool.from_url(server_url))
def get_logger(name=name):
“”" 獲取一個logger,用以格式化輸出信息 :param name: :return: “”"
logger = logging.getLogger(name)
logger.handlers.clear()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
‘%(asctime)s - %(name)s - %(levelname)s: - %(message)s’,
datefmt=‘%Y-%m-%d %H:%M:%S’)
# 使用StreamHandler輸出到屏幕
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
涉及到的所有配置,可以根據情況進行修改:

-- coding: utf-8 --

@File : settings.py

@Author: AaronJny

@Date : 18-12-14 上午11:13

@Desc :

代理池redis鍵名

IP_POOL_KEY = ‘open_proxy_pool’

redis連接,根據實際情況進行配置

REDIS_SERVER_URL = ‘redis://:your_password@your_host:port/db_name’

api對外端口

API_WEB_PORT = 9102

代理是否需要通過密碼訪問,當此項為False時可無視USERNAME和PASSWORD的配置

USE_PASSWORD = True

用戶名

注意:用戶名密碼是指代理服務方提供給你,用以驗證訪問授權的憑證。

無密碼限制時可無視此項,并將USE_PASSWORD改為False

USERNAME = ‘your_username’

密碼

PASSWORD = ‘your_password’

功能組件開關*

打開web api功能,不使用web api的話可以關閉

WEB_API_OPENED = True

打開squid代理轉發服務的維持腳本,不使用squid的話可以關閉

SQUID_KEEPER_OPENED = True

打開清理過期ip的腳本,如果池內的代理ip永遠不會失效的話可以關閉

EXPIRE_IP_CLEANER_OPENED = True

打開定時獲取ip并檢查的腳本,如果不需要獲取新ip的話可以關閉

IP_GETTER_OPENED = True

***********************************

清理代理ip的頻率,如下配置代表每兩次之間間隔6秒

CLEAN_INTERVAL = 6

獲取代理ip的頻率,根據api的請求頻率限制進行設置

比如站大爺的頻率限制是10秒一次,我就設置成了12秒

FETCH_INTERVAL = 12

squid從redis中加載新ip的頻率

SQUID_KEEPER_INTERVAL = 12

代理ip的生命周期,即一個新ip在多久后將被刪除,單位:秒

PROXY_IP_TTL = 60
運行
到這里,編碼就完成了。打開終端,切換到項目根目錄,輸入python3 scheduler.py運行即可。建議使用screen后臺運行。

給出一個運行的截圖(有機器在調用接口,我把ip隱藏了):

總結

以上是生活随笔為你收集整理的如何构建一个自己的代理ip池的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。