日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

脚本检测CDN节点资源是否与源站资源一致

發布時間:2024/9/5 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 脚本检测CDN节点资源是否与源站资源一致 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需求:

  1、所有要檢測的資源url放到一個單獨文件中

  2、檢測cdn節點資源大小與源站文件大小是否一致

  3、隨機抽查幾個資源,檢查md5sum是否一致

  4、使用多線程,可配置線程數

?

代碼目錄:

hexm:Hexm hexm$ tree ./checkcdn ./checkcdn ├── README.TXT ├── check.py # 主程序 ├── conf │ └── url.txt # 配置文件 ├── lib │ ├── __init__.py │ ├── common.py │ └── threadpool.py # 線程池 └── tmp├── cdn # 存放從CDN節點系在的資源└── origin # 存放從源站下載的資源

?

README.TXT

依賴:requests 兼容性:兼容Python3以及Python2.7使用方法:usage: check.py [-h] [-t THREADS] [-c COUNTS]optional arguments:-h, --help show this help message and exit-t THREADS, --threads THREADS開啟多少線程,默認5個-c COUNTS, --counts COUNTS檢測多少個包的md5值,默認3個

conf/url.txt

http://xxx_1020101.apk
http://xxx_1020102.apk
http://xxx_1020103.apk
http://xxx_1020104.apk

check.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : check.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03import os import sys import random import argparse import requestsBASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(BASE_DIR)# 代理IP PROXIES = {"http": "http://183.136.135.191:80", } # 配置文件 CONFIG = BASE_DIR + '/conf/url.txt' # 保存CDN節點文件臨時目錄 CDNTEMPDIR = BASE_DIR + '/tmp/cdn/' # 保存源站文件臨時目錄 ORIGINTEMPDIR = BASE_DIR + '/tmp/origin/'from lib.threadpool import ThreadPool from lib.common import isdir, download, getfilemd5def callback(status, result):"""回調函數,如果函數有返回值得話用得到:param status: 狀態 True or None:param result: 函數返回值"""passdef checkstatus(url):"""通過head方法查看源站與當前CDN節點資源大小是否一致:param url: url:return: None"""r1 = requests.head(url, proxies=PROXIES)r2 = requests.head(url)if r1.status_code == 200 and r2.status_code == 200:if r1.headers['Content-Length'] == r2.headers['Content-Length']:print("%s 源站和CDN節點資源\033[0;32m一致\033[0m, 源站文件大小為%s,CDN節點文件大小為%s"% (url,r1.headers['Content-Length'],r2.headers['Content-Length']))else:print("%s 源站和CDN節點資源\033[0;31m不一致\033[0m, 源站文件大小為%s,CDN節點文件大小為%s"% (url,r1.headers['Content-Length'],r2.headers['Content-Length']))else:print("%s 源站和CDN節點狀態碼\033[0;31m異常\033[0m,源站狀態碼為%s,CDN節點狀態碼為%s"% (url,r1.status_code,r2.status_code))def checkmd5(url, cdnTempDir, originTempDir):"""檢查源站與當前cdn節點資源是否一致,下載超時300s:param url: url:param cdnTempDir: 保存從cdn節點下載的臨時文件目錄:param originTempDir: 保存從源站下載的臨時文件目錄:return: None"""filename = url.split('/')[-1]tempCdnFile = cdnTempDir + filenametempOriginFile = originTempDir + filenamestatus1 = download(url, tempOriginFile, proxies=PROXIES)if status1 is not None:if status1 == 200:status2 = download(url, tempCdnFile)else:print("%s \033[0;31m狀態碼異常\033[0m校驗失敗" % url)if status1 == 200 and status2 == 200:if getfilemd5(tempCdnFile) == getfilemd5(tempOriginFile):print("%s 源站和cdn節點資源md5值\033[0;32m一致\033[0m," % url)else:print("%s 源站和cdn節點資源md5值\033[0;31m不一致\033[0m" % url)elif status1 is None or status2 is None:print("%s \033[0;31m下載失敗\033[0m" % url)# 檢查后刪除下載的文件try:os.remove(tempOriginFile)os.remove(tempCdnFile)except Exception as e:passdef parse_args():"""解析命令行參數:return: args"""parser = argparse.ArgumentParser()help = '開啟多少線程,默認5個'parser.add_argument('-t', '--threads', type=int, help=help, default='5')help = '檢測多少個包的md5值,默認3個'parser.add_argument('-c', '--counts', type=int, help=help, default=3)args = parser.parse_args()return argsif __name__ == "__main__":if not isdir(CDNTEMPDIR): os.makedirs(CDNTEMPDIR)if not isdir(ORIGINTEMPDIR): os.makedirs(ORIGINTEMPDIR)# 從文件中獲取所有urlurls = [line.strip() for line in open(CONFIG, mode='r').readlines()]args = parse_args()# 檢查包大小pool = ThreadPool(args.threads) # 最多創建5個線程for url in urls:pool.run(checkstatus, (url,), callback=None)# 隨機抽查3個,檢查md5for randurl in random.sample(urls, args.counts):pool.run(checkmd5, (randurl, CDNTEMPDIR, ORIGINTEMPDIR,), callback=None)pool.close() check.py

lib/common.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : common.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-24 10:03import os import hashlib import requestsdef getfilesize(path):"""獲取文件大小:param path: 文件路徑:return: 返回文件大小"""return os.path.getsize(path)def isfile(path):"""判斷是否是文件:param path: 文件路徑:return: 如果是返回True,否則返回None"""if os.path.isfile(path): return Truedef isdir(path):"""判斷是否是目錄:param path: 路徑:return: True or None"""if os.path.isdir(path): return Truedef getstatus(url, proxies=None):"""返回狀態碼:param url: url:return: 狀態碼"""return requests.head(url, proxies).status_codedef download(url, path, proxies=None):"""下載文件,并返回狀態碼:param url: 下載的url:param path: 保存文件的路徑:param proxies: 使用代理的地址:return: 返回狀態碼"""try:response = requests.get(url, proxies=proxies, stream=True, timeout=60)status = response.status_codetotal_size = int(response.headers['Content-Length'])# print(response.headers)if status == 200:with open(path, 'wb') as f:for chunk in response.iter_content(chunk_size=8192):if chunk: f.write(chunk)if total_size == getfilesize(path): # 下載文件大小與頭部Content-Length大小一致,則下載成功return status# 狀態碼非200,返回狀態碼else: return statusexcept Exception as e:return Nonedef getfilemd5(path):"""返回文件的md5sum:param path: 文件路徑:return: 返回校驗和,否則返回None"""if isfile(path):md5obj = hashlib.md5()maxbuf = 8192f = open(path, 'rb')while True:buf = f.read(maxbuf)if not buf:breakmd5obj.update(buf)f.close()hash = md5obj.hexdigest()return hashreturn Noneif __name__ == "__main__":pass View Code

lib/threadpool.py

#!/usr/bin/env python # -*- coding:utf-8 -*- # File Name : threadpool.py # Author : hexm # Mail : xiaoming.unix@gmail.com # Created Time : 2017-03-23 20:03import sys if sys.version > '3':import queue else:import Queue as queue import threading import contextlib import timeStopEvent = object() # 終止線程信號class ThreadPool(object):"""1、解決線程重用問題,當前線程執行完任務后,不殺掉,放到空閑線程列表,繼續執行下個任務2、根據任務量開啟線程,如果設置10個線程,只有2個任務,最多只會開啟兩個線程3、如果有500個任務,任務執行非常快,2個線程就能完成,如果設置開啟10個線程,只會開啟兩個線程"""def __init__(self, max_num, max_task_num = None):if max_task_num:self.q = queue.Queue(max_task_num) # 指定任務最大數,默認為None,不限定else:self.q = queue.Queue()self.max_num = max_num # 最多多少線程self.cancel = False # 執行完所有任務,終止線程信號self.terminal = False # 無論執行完畢與否,都終止所有線程self.generate_list = [] # 已創建多少線程self.free_list = [] # 空閑多少線程def run(self, func, args, callback=None):"""線程池執行一個任務:param func: 任務函數:param args: 任務函數所需參數:param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值:return: 如果線程池已經終止,則返回True否則None"""if self.cancel:return# 沒有空閑線程 并且已創建線程小于最大線程數才創建線程,if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread() # 滿足則創建線程,并將任務放進隊列w = (func, args, callback,)# 函數,元組,函數 ,將這三個參數放在元組里面,當成一個整體放到隊列里面self.q.put(w) # 滿足條件則創建線程,并把任務放隊列里面def generate_thread(self):"""創建一個線程"""t = threading.Thread(target=self.call) # 每一個線程被創建,執行call方法 t.start()def call(self):"""循環去獲取任務函數并執行任務函數"""current_thread = threading.currentThread()self.generate_list.append(current_thread) # 每創建一個線程,將當前線程名加進已創建的線程列表 event = self.q.get() # 在隊列中取任務, 沒任務線程就阻塞,等待取到任務,線程繼續向下執行while event != StopEvent: # 是否滿足終止線程 func, arguments, callback = event # 取出隊列中一個任務try:result = func(*arguments) # 執行函數,并將參數傳進去success = Trueexcept Exception as e:success = Falseresult = Noneif callback is not None:try:callback(success, result)except Exception as e:passwith self.worker_state(self.free_list, current_thread): # 當前線程執行完任務,將當前線程置于空閑狀態,#這個線程等待隊列中下一個任務到來,如果沒來,一直處于空閑, 如果到來,去任務if self.terminal:event = StopEventelse:event = self.q.get() # 將當前任務加入到空閑列表后,如果有任務,取到,沒有阻塞 取到后,移除當前線程else: # 滿足終止線程,在創建的線程列表中移除當前線程 self.generate_list.remove(current_thread)def close(self):"""執行完所有的任務后,殺掉所有線程"""self.cancel = True # 標志設置為Truefull_size = len(self.generate_list) + 1 # 已生成線程個數, +1 針對python2.7while full_size:self.q.put(StopEvent) # full_size -= 1def terminate(self):"""無論是否還有任務,終止線程"""self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.queue.clear()@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用于記錄線程中正在等待的線程數"""state_list.append(worker_thread) # 將當前空閑線程加入空閑列表try:yieldfinally:state_list.remove(worker_thread) # 取到任務后,將當前空閑線程從空閑線程里移除,# 使用例子 if __name__ == "__main__":pool = ThreadPool(5) # 創建pool對象,最多創建5個線程def callback(status, result):passdef action(i):time.sleep(1)print(i)for i in range(30): # 共30個任務ret = pool.run(action, (i,), callback=None) # 將action函數,及action的參數,callback函數傳給run()方法pool.close() View Code

?

例子:

?

轉載于:https://www.cnblogs.com/xiaoming279/p/6626768.html

總結

以上是生活随笔為你收集整理的脚本检测CDN节点资源是否与源站资源一致的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。