python压测接口_python的一个接口压测脚本
1 importrequests2 import queue #Queue模塊中提供了同步的、線程安全的隊(duì)列類,包括
3 #FIFO(先入先出)隊(duì)列Queue,LIFO(后入先出)隊(duì)列
4 #LifoQueue,和優(yōu)先級(jí)隊(duì)列PriorityQueue。這些隊(duì)列都
5 #實(shí)現(xiàn)了鎖原語(yǔ),可在多線程通信中直接使用。
6 importthreading7 importtime8
9 status_code_list =[]10 exec_time =011 classMyThreadPool:12 def __init__(self, maxsize): #定義隊(duì)列時(shí)有一個(gè)默認(rèn)的參數(shù)
13 #maxsize, 如果不指定隊(duì)列的長(zhǎng)度,即manxsize=0,那么隊(duì)列的長(zhǎng)
14 #度為無(wú)限長(zhǎng),如果定義了大于0的值,那么隊(duì)列的長(zhǎng)度就是maxsize。
15 self.maxsize =maxsize16 self._pool =queue.Queue(maxsize)17 #maxsize設(shè)置隊(duì)列的大小為pool的大小
18 for _ in range(maxsize): #為什么用一個(gè)下劃線,因?yàn)閷?shí)際上這
19 #里沒(méi)用到這個(gè)變量,所以用一個(gè)符號(hào)就可以了。
20 self._pool.put(threading.Thread) #往pool里放線程數(shù)
21
22 defget_thread(self):23 returnself._pool.get()24
25 defadd_thread(self):26 self._pool.put(threading.Thread)27
28 defrequest_time(func):29 def inner(*args, **kwargs):30 globalexec_time31 start_time =time.time()32 func(*args, **kwargs)33 end_time =time.time()34 exec_time = end_time-start_time35
36 returninner37
38
39 defget_url(url):40 globalx,status_code_list41 headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.146 Safari/537.36',42 }43 response = requests.get(url,headers=headers)44 code =response.status_code45 status_code_list.append(code)46 print(code)47 returncode48
49
50 def get_count(_url='http://news.baidu.com/sports',_count=100): #:param count: 每個(gè)線程請(qǐng)求的數(shù)量
51 globalstatus_code_list,url,count52 for i inrange(count):53 get_url(url)54
55 defrequest_status():56 count_num =len(status_code_list)57 set_code_list =set(status_code_list)58 status_dict ={}59 for i inset_code_list:60 status_dict[i] =str(status_code_list).count(str(i))61 echo_str(count_num, set_code_list, status_dict)62
63 defecho_str(count_num,set_code_list,status_dict):64 print('=======================================')65 print('請(qǐng)求總次數(shù):%s'%count_num)66 print('請(qǐng)求時(shí)長(zhǎng):%s秒'%int(exec_time))67 second_request = count_num/int(exec_time)68 print('每秒請(qǐng)求約:%s次'%int(second_request))69 print('狀態(tài)碼 | 次數(shù)')70
71 for k,v instatus_dict.items():72 print(str(k)+'|'+str(v))73 print('=======================================')74
75
76 @request_time77 def run(url,thread_num=10,thread_pool=10):78 '''
79 :param thread_num: 總共執(zhí)行的線程數(shù)(總的請(qǐng)求數(shù)=總共執(zhí)行的線程數(shù)*每個(gè)線程循環(huán)請(qǐng)求的數(shù)量)80 :param thread_pool: 線程池?cái)?shù)量81 :param url: 請(qǐng)求的域名地址82 '''
83 globalx,status_code_list84 pool =MyThreadPool(thread_pool)85 for i inrange(thread_num):86 t =pool.get_thread()87 obj = t(target=get_count)88 obj.start()89 obj.join()90
91
92 if __name__ == '__main__':93 count = 10 #單個(gè)線程的請(qǐng)求數(shù)
94 url = 'http://baidu.com'
95 run(url,100,100)96 request_status()
總結(jié)
以上是生活随笔為你收集整理的python压测接口_python的一个接口压测脚本的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: windows10安装python环境_
- 下一篇: python datetime格式转换_