日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

python如何批量发布数据并如何定时更换token

發布時間:2025/3/20 python 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python如何批量发布数据并如何定时更换token 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

有時我們會遇到一種情況,就是用同一個接口去post不同的數據,即大量的數據。

而開發在收據和發數據采用的方法是分頁。

對于這種情況,

  • 測試模擬分頁的方式就是用多線程去post數據。
  • 而定時更換令牌,采用的是定時任務的方法,將獲取的令牌存入文檔里,然后定時更新文檔里的令牌,post數據時,從文檔里獲取最新的令牌即可。

多線程post大量數據

代碼如下:

import json,requests,os,socket from threading import Thread,Timer import datetime#防止請求接口超時返回錯誤 socket.setdefaulttimeout(100) #文件基礎文件夾目錄,使用時修改 bathpath = r'C:xxxxxxxx' #API 的url,使用時修改 url = "xxxxx" #使用時修改,尾數文件都會被分配到最后一個線程 threadNum = 10# 獲取文件夾里面的所有的文本 filenames=os.listdir(bathpath)#統計用的 filecount=0 sucfilecount=0 failfilecount=0 updatetimes = 0 #目標函數 def postSDAP(filename):global filecount,sucfilecount,failfilecount,updatetimesfilecount += 1filepath = os.path.join(bathpath, filename)# 讀取tokenwith open('token.txt', 'r', encoding='utf-8') as f:lines = f.readlines()# 讀第一行,token,文件只有一行token = lines[0]with open(filepath, 'r', encoding='utf-8') as load_f:load_dict = json.load(load_f)# 將dict數據轉化成json數據,以便去postload_json = json.dumps(load_dict, ensure_ascii=False, indent=4).encode('utf-8')headers = {'Content-Type': 'text/plain; charset =UTF-8',"Authorization": token,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36","keep-alive" : "False","Sec - Fetch - Mode": "cors"}print('正在發送文件數據--------------' + filename + '-----------------')#不打印requests提示信息requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5try:r = requests.post(url, data=load_json, headers=headers,verify=False,timeout=300)nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') # 現在 含微秒的日期時間,來源 比特量化print('當前時間是:'+str(nowTime)+'------文件' + filename + '-------返回code-------' + str(r))print('文件' + filename + '-------返回content-----------' + str(r.text))print('文件' + filename + '-------響應時間-----------' + str(r.elapsed.total_seconds()))print('文件' + filename + '-------返回header---------' + str(r.headers))print('文件' + filename + '-------request size ---------' + str(len(load_json)))sucfilecount += 1r.close()except requests.ConnectionError as e:failfilecount += 1print('文件' + filename +'----連接出錯'+str(e))except requests.HTTPError as e:failfilecount += 1print('文件' + filename +'----HTTP出錯啦'+str(e))except requests.ConnectTimeout as e:failfilecount += 1print('文件' + filename + '----連接遠程服務器超時' + str(e))except requests.ReadTimeout as e:failfilecount += 1print('文件' + filename + '----讀取超時,遠程服務器無返回' + str(e))print('------------已發送文件個數為'+str(filecount))print('------------發送成功文件個數為' + str(sucfilecount))print('------------發送失敗文件個數為' + str(failfilecount)) #處理多個dir,將作為線程的target函數 def mulitPost(filenames):for i in filenames:postSDAP(i)threadsPool = [] #文件角標起始位置 begin = 0#每個線程處理的文件個數(根據總文件個數去計算 n/10 取整,把尾數文件分攤放在最后一個線程 num = len(filenames)//threadNum print(num,type(num))# 靈活配置 文件處理個數,開啟 指定個數的線程。通過range控制 for i in range(threadNum):if i+1 == threadNum:# 處理尾數的文件,'最后一個線程的起',begin# 注意args 后面的逗號threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:], )))else:threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:begin+num], )))begin = begin+numfor thread in threadsPool:thread.start()for thread in threadsPool:thread.join()

需要注意的是: 元組如果只有一個元素的時候,元素后面一定要加逗號,來完成元組的聲明。如果不添加逗號,那么變量的類型是字符串。

上面的代碼只解決了post大批量數據的問題,但是沒有解決token問題。

實際情況中,我們經常會遇到token從另一個api獲得。

代幣獲取

代碼如下:

''' 遇到問題沒人解答?小編創建了一個Python學習交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' def getoken():#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['access_token']token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('uat_token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執行一次gettoken 好讓token文檔里有初始的token gettoken()

上述方法解決了,從api直接獲取最新的令牌,但是沒有解決定時更新。

這樣采用了線程在里面的計時器方法。使用變量變量去控制獲取令牌方法的次數,即token.txt文件里的令牌會被更新對應的次數。

定時獲取令牌

代碼更新如下:

count = 0 # 實時獲取最新的token def getoken():global countcount += 1#使用timer 做定時任務,過1800秒后執行 (遞歸),定時執行10次,用count 控制if count < 10:global timertimer = Timer(1800, getoken)timer.start()#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['xxxx'] # token在json里面名字token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執行一次gettoken 好讓token文檔里有初始的token gettoken()

但是這個方法會導致,post大量數據都結束了,整個程序一直都是執行狀態,沒有正常退出。這是因為gettoken這個方法,里面的定時器定時任務,設置的執行次數和間隔時間比較長。

但是又不能確定post方法到底執行多久,因為這個是根據數據量的大小去確定的。

最后終于想到一個方法,當post方法執行完成,即線程結束時,結束Timer任務,采用的方法是Timer里面的cancle方法。

最后的代碼

代碼更新如下:

import json,requests,os,socket from threading import Thread,Timer import datetime#防止請求接口超時返回錯誤 socket.setdefaulttimeout(100) #文件基礎文件夾目錄,使用時修改 bathpath = r'C:xxxxxxxx' #API 的url,使用時修改 url = "xxxxx" #使用時修改,尾數文件都會被分配到最后一個線程 threadNum = 10# 獲取文件夾里面的所有的文本 filenames=os.listdir(bathpath)count = 0 # 實時獲取最新的token def getoken():global countcount += 1#使用timer 做定時任務,過1800秒后執行 (遞歸),定時執行10次,用count 控制if count < 10:global timertimer = Timer(1800, getoken)timer.start()#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['xxxx'] # token在json里面名字token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執行一次gettoken 好讓token文檔里有初始的token gettoken()#統計用的 filecount=0 sucfilecount=0 failfilecount=0 updatetimes = 0 #目標函數 def postSDAP(filename):global filecount,sucfilecount,failfilecount,updatetimesfilecount += 1filepath = os.path.join(bathpath, filename)# 讀取tokenwith open('token.txt', 'r', encoding='utf-8') as f:lines = f.readlines()# 讀第一行,token,文件只有一行token = lines[0]with open(filepath, 'r', encoding='utf-8') as load_f:load_dict = json.load(load_f)# 將dict數據轉化成json數據,以便去postload_json = json.dumps(load_dict, ensure_ascii=False, indent=4).encode('utf-8')headers = {'Content-Type': 'text/plain; charset =UTF-8',"Authorization": token,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36","keep-alive" : "False","Sec - Fetch - Mode": "cors"}print('正在發送文件數據--------------' + filename + '-----------------')#不打印requests提示信息requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5try:r = requests.post(url, data=load_json, headers=headers,verify=False,timeout=300)nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') # 現在 含微秒的日期時間,來源 比特量化print('當前時間是:'+str(nowTime)+'------文件' + filename + '-------返回code-------' + str(r))print('文件' + filename + '-------返回content-----------' + str(r.text))print('文件' + filename + '-------響應時間-----------' + str(r.elapsed.total_seconds()))print('文件' + filename + '-------返回header---------' + str(r.headers))print('文件' + filename + '-------request size ---------' + str(len(load_json)))sucfilecount += 1r.close()except requests.ConnectionError as e:failfilecount += 1print('文件' + filename +'----連接出錯'+str(e))except requests.HTTPError as e:failfilecount += 1print('文件' + filename +'----HTTP出錯啦'+str(e))except requests.ConnectTimeout as e:failfilecount += 1print('文件' + filename + '----連接遠程服務器超時' + str(e))except requests.ReadTimeout as e:failfilecount += 1print('文件' + filename + '----讀取超時,遠程服務器無返回' + str(e))print('------------已發送文件個數為'+str(filecount))print('------------發送成功文件個數為' + str(sucfilecount))print('------------發送失敗文件個數為' + str(failfilecount)) #處理多個dir,將作為線程的target函數 def mulitPost(filenames):for i in filenames:postSDAP(i)threadsPool = [] #文件角標起始位置 begin = 0#每個線程處理的文件個數(根據總文件個數去計算 n/10 取整,把尾數文件分攤放在最后一個線程 num = len(filenames)//threadNum print(num,type(num))# 靈活配置 文件處理個數,開啟 指定個數的線程。通過range控制 for i in range(threadNum):if i+1 == threadNum:# 處理尾數的文件,'最后一個線程的起',begin# 注意args 后面的逗號threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:], )))else:threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:begin+num], )))begin = begin+numfor thread in threadsPool:thread.start()for thread in threadsPool:thread.join() # 當主任務結束時,直接結束定時任務,避免post都結束了,但是整個程序還沒有結束 timer.cancel()

需要注意的是,將timer定義成變量,才能在方法外調用。

總結

以上是生活随笔為你收集整理的python如何批量发布数据并如何定时更换token的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。