日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

python 读取mysql大量数据处理_python使用多线程快速把大量mysql数据导入elasticsearch...

發布時間:2025/3/15 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python 读取mysql大量数据处理_python使用多线程快速把大量mysql数据导入elasticsearch... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

使用python多線程,運行時自定義線程數,自動計算每個線程處理的數據量,連接mysql讀取數據,處理成需要的字段入到elasticsearch。

運行效果圖:

10個線程 運行會在這里實時刷新,方便查看每個線程處理到的id數:[root@localhost shw]# python put_album.py

{"0": "2105success ", "1": "196723success ", "2": "392557null", "3": "587819null", "4": "782519null", "5": "977482null", "6": "1172186null", "7": "1366897null", "8": "1561614null", "9": "1754368success "} "}

代碼如下:#!/usr/bin/python

#coding:utf-8

import requests,re,os,time,urllib,urllib2,random

import json,MySQLdb,sys,math,threading

reload(sys)

sys.setdefaultencoding('utf-8')

ES_CFG = {"host":"127.0.0.1","port":"9200","user":"elastic","password":"123456","index_name":"es_album","doc":"zhuanji"}

ES_OBJ = {}

ES_OBJ['album_url'] = "http://"+ES_CFG['host']+":"+str(ES_CFG['port']+"/"+ES_CFG['index_name']+'/'+ES_CFG['doc'])

MSG = {}

#數據庫連接對象

def db_obj():

return MySQLdb.connect('127.0.0.1','root','123456','dbname',port=3308,charset="utf8")

def api_post(url,data):

jdata_str = json.dumps(data)

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',"Content-Type": "application/json"}

#print http_post('http://www.xxxx.com/api_pnews',jdata_str)

res = requests.post(url,data=jdata_str,auth=(ES_CFG['user'],ES_CFG['password']),headers=headers)

res.close()

return res

def strip_tags(html):

if html == None:

return '';

try:

dr = re.compile(r']+)>',re.S)

html = dr.sub('',html)

html = re.sub(r'[\n]+',r'', html, flags=re.S)

return html.strip()

except Exception as e:

print e

print html

exit(0)

def pretreat(html):

#"去標簽"

str_txt=strip_tags(html)

str_txt = re.sub('\n|\t','',str_txt)

str_txt = re.sub('"','',str_txt)

str_txt = re.sub("'",'',str_txt)

str_txt = re.sub(' ','',str_txt)

str_txt = re.sub("\xa0",'',str_txt)

str_txt = re.sub("\u3000",'',str_txt)

return str_txt

def create_index(start,end,msg_id):

db = db_obj()

url = ES_OBJ['album_url']

for i in range(start,end):

time.sleep(0.01)

cur = db.cursor()

sql = "select id,name,keywords,desption from shwcms_album where id = %s"%i;

cur.execute(sql)

result = cur.fetchone()

if result != None:

data = {}

data['itemid'] = result[0]

data['album_name']=pretreat(result[1])

data['keywords']=pretreat(result[2])

data['desption']=pretreat(result[3])

res = api_post(url,data)

#print json.dumps(data)

if res != None:

MSG[msg_id] = (str(i)+'success ')

else:

MSG[msg_id] = str(i)+"fail"

else:

MSG[msg_id] = str(i)+"null"

db.close()

max_threading = 10 #定義100個線程

max_id = 1947072 #最大值

start_id = 1 #起始值

size = int(math.ceil((max_id-start_id+1)/max_threading))

#print size

for i in range(0,max_threading):

MSG[i] = "";

threading.Thread(target=create_index,args=(i*size,(i+1)*size,i)).start()

while True:

sys.stdout.write("\r %s" %json.dumps(MSG))

sys.stdout.flush()

time.sleep(1)

非特殊說明,本博所有文章均為博主原創。

最新文章

總結

以上是生活随笔為你收集整理的python 读取mysql大量数据处理_python使用多线程快速把大量mysql数据导入elasticsearch...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。