日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

python链接mysql系统结构设计_第11章:使用Python打造MySQL专家系统

發布時間:2025/3/15 数据库 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python链接mysql系统结构设计_第11章:使用Python打造MySQL专家系统 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.Python語言高級特性

1).深入淺出Python生成器

1).生成器函數:與普通函數定義類似,使用yield語句而不是return語句返回結果。yield語句一次返回一個結果,在每個結果中間掛起函數的狀態,以便下次從它離開的地方繼續執行2).生成器表達式:類似于列表推導,但是,生成器返回按需產生結果的一個對象,而不是一次構建一個結果列表

使用生成器的例子,使用生成器返回自然數的平方:defgensquares(N):for i inrange(N):yield i ** 2

defmain():for item in gensquares(234):print(item)if __name__ == '__main__':

main()

使用普通函數實現defgensquares(N):

res=[]for i inrange(N):

res.append(i*i)returnresdefmain():for item in gensquares(234):print(item)if __name__ == '__main__':

main()

2).深入淺出Python裝飾器

Python中函數可以賦值給另外一個變量名,函數可以嵌套,以及函數對象可以作為另外一個函數的參數等1、函數對象2、嵌套函數3、裝飾器原型defbread(f):def wrapper(*args, **kwargs):print("begin")

f()print("end")returnwrapper

@breaddefsay_hi():print("Hi")defmain():

say_hi()if __name__ == '__main__':

main()

3).Python上下文管理器

1、with語句形式化定義2、上下文管理器的應用場景3、上下文管理器協議

2.MySQL數據庫

1).Python連接數據庫

importosif os.getenv('DB', 'MySQL') == 'MySQL':importpymysql as dbelse:importsqlite3 as dbdef get_conn(**kwargs):if os.getenv('DB', 'MySQL') == 'MySQL':return db.connect(host=kwargs.get('host', 'localhost'),

user=kwargs.get('user'),

passwd=kwargs.get('passwd'),

port=kwargs.get('port', 3306),

db=kwargs.get('db'))else:return db.connect(database=kwargs.get('db'))defexecute_sql(conn, sql):

with conn as cur:

cur.execute(sql)defcreate_table(conn):

sql_drop_table= "DROP TABLE IF EXISTS student"sql_create_table= """create table student (sno int(11) not null,sname varchar(20) default null,sage int(11) default null,primary key (sno)) engine=InnoDB default charset=utf8"""

for sql in[sql_drop_table, sql_create_table]:

execute_sql(conn, sql)definsert_data(conn, sno, sname, sage):

insert_format= "insert into student(sno, sname, sage) values ({0}, '{1}', {2})"sql=insert_format.format(sno, sname, sage)

execute_sql(conn, sql)defmain():

conn= get_conn(host='127.0.0.1',

user='root',

passwd='msds007',

port=3306,

db='test')try:

create_table(conn)

insert_data(conn,1, 'zhangsan', 20)

insert_data(conn,2, 'lisi', 21)

with conn as cur:

cur.execute("select * from student")

rows=cur.fetchall()for row inrows:print(row)finally:ifconn:

conn.close()if __name__ == '__main__':

main()

2).使用上下文管理器對數據庫連接進行管理

importosfrom contextlib importcontextmanagerif os.getenv('DB', 'MySQL') == 'MySQL':importpymysql as dbelse:importsqlite3 as db

@contextmanagerdef get_conn(**kwargs):if os.getenv('DB', 'MySQL') == 'MySQL':

conn= db.connect(host=kwargs.get('host', 'localhost'),

user=kwargs.get('user'),

passwd=kwargs.get('passwd'),

port=kwargs.get('port', 3306),

db=kwargs.get('db'))try:yieldconnfinally:ifconn:

conn.close()defexecute_sql(conn, sql):

with conn as cur:

cur.execute(sql)defcreate_table(conn):

sql_drop_table= "DROP TABLE IF EXISTS student"sql_create_table= """create table student (sno int(11) not null,sname varchar(20) default null,sage int(11) default null,primary key (sno)) engine=InnoDB default charset=utf8"""

for sql in[sql_drop_table, sql_create_table]:

execute_sql(conn, sql)definsert_data(conn, sno, sname, sage):

insert_format= "insert into student(sno, sname, sage) values ({0}, '{1}', {2})"sql=insert_format.format(sno, sname, sage)

execute_sql(conn, sql)defmain():

conn_args= dict(host='127.0.0.1',user='root',passwd='msds007',port=3306,db='test')

with get_conn(**conn_args) as conn:

create_table(conn)

insert_data(conn,1, 'zhangsan', 20)

insert_data(conn,2, 'lisi', 21)

with conn as cur:

cur.execute("select * from student")

rows=cur.fetchall()for row inrows:print(row)if __name__ == '__main__':

main()

3).案例:從csv文件導入數據到MySQL

importosimportcsvfrom collections importnamedtuplefrom contextlib importcontextmanagerif os.getenv('DB', 'MySQL') == 'MySQL':importpymysql as dbelse:importsqlite3 as db

@contextmanagerdef get_conn(**kwargs):if os.getenv('DB', 'MySQL') == 'MySQL':

conn= db.connect(host=kwargs.get('host', 'localhost'),

user=kwargs.get('user'),

passwd=kwargs.get('passwd'),

port=kwargs.get('port', 3306),

db=kwargs.get('db'))try:yieldconnfinally:ifconn:

conn.close()defexecute_sql(conn, sql):

with conn as cur:

cur.execute(sql)defget_data(file_name):

with open(file_name) as f:

f_csv=csv.reader(f)

headings=next(f_csv)

Row= namedtuple('Row', headings)for r inf_csv:yield Row(*r)defmain():

conn_args= dict(host='127.0.0.1',user='root',passwd='msds007',port=3306,db='test')

with get_conn(**conn_args) as conn:

SQL_FORMAT= """insert into student(sno,sname,sage) values({0},'{1}',{2})"""

for t in get_data('data.csv'):

sql=SQL_FORMAT.format(t.sno, t.sname, t.sage)

execute_sql(conn, sql)if __name__ == '__main__':

main()

3.Python并發編程

1).Python中的多線程

Python默認的解釋器,由于全局解釋器鎖的存在,確實在任意時刻都只有一個線程在執行Python代碼,致使多線程不能充分利用機器多核的特性

Python由于GIL(Global Interpreter Lock)鎖的原因,并沒有真正的并發

Python標準庫提供了兩個與線程相關的模塊,分別是thread和threading

thread是低級模塊,threading是高級模塊,threading對thread進行了封裝1、創建線程2、如何給線程傳遞參數3、線程的常用方法4、通過繼承創建線程

2).線程同步與互斥鎖

在Python標準庫的threading模塊中有一個名為Lock的工廠函數,會返回一個thread.LockType對象

該對象的acquire方法用來獲取鎖,release方法用來釋放鎖try:

lock.acquire()#do something

finally:

lock.release()

使用上下文管理器:

with lock:#do something

使用互斥鎖例子:importthreading

lock=threading.Lock()

num=0defincre(count):globalnumwhile count >0:

with lock:

num+= 1count-= 1

defmain():

threads=[]for i in range(10):

thread= threading.Thread(target=incre, args=(100000,))

thread.start()

threads.append(thread)for thread inthreads:

thread.join()print("expected value is", 10 * 100000, ", real value is", num)if __name__ == '__main__':

main()

3).線程安全隊列Queue

隊列是線程間最常用的交換數據的形式,Queue模塊實現了線程安全的隊列,尤其適合多線程編程

簡單例子:importQueue

q=Queue.Queue()for i in range(3):

q.put(i)while notq.empty():print(q.get())

Python官方給出的多線程模型:defworker():whileTrue:

item=q.get()

do_work(item)

q.task_done()

q=Queue()for i inrange(num_worker_threads):

t= Thread(target=worker)

t.daemon=True

t.start()for item insource():

q.put(item)

q.join()#block until all tasks are done

4).案例:使用Python打造一個MySQL壓測工具

importstringimportargparseimportrandomimportthreadingimporttimeimportdatetimeimportpymysqlfrom contextlib importcontextmanager

DB_NAME= 'test_insert_data_db'TABLE_NAME= 'test_insert_data_table'CREATE_TABLE_STATEMENT= """create table {0} (id int(11) not null auto_increment, name varchar(255) not null, birthday datetime not null, primary key (id))""".format(TABLE_NAME)#_argparse的唯一作用就是使用標準庫的argparse模塊解析民工行參數并生成幫助信息

def_argparse():

parser= argparse.ArgumentParser(description='benchmark tool for MySQL database')

parser.add_argument('--host', action='store', dest='host', required=True, help='connect to host')

parser.add_argument('--user', action='store', dest='user', required=True, help='user for login')

parser.add_argument('--password', action='store', dest='password', required=True, help='password to use when connecting to server')

parser.add_argument('--port', action='store', dest='port', default=3306, type=int, help='port number to use for connection or 3306 for default')

parser.add_argument('--thread_size', action='store', dest='thread_size', default=5, type=int, help='how much connection for database usage')

parser.add_argument('--row_size', action='store', dest='row_size', default=5000, type=int, help='how mucch rows')

parser.add_argument('-v', '--version', action='version', version='%(prog)s 0.1')returnparser.parse_args()

@contextmanagerdef get_conn(**kwargs):

conn= pymysql.connect(**kwargs)try:yieldconnfinally:

conn.close()defcreate_db_and_table(conn):

with conn as cur:for sql in ["drop database if exists {0}".format(DB_NAME), "create database {0}".format(DB_NAME), "use {0}".format(DB_NAME), CREATE_TABLE_STATEMENT]:print(sql)

cur.execute(sql)def random_string(length=10):

s= string.letters +string.digitsreturn "".join(random.sample(s, length))defadd_row(cursor):

SQL_FORMAT= "INSERT INTO {0}(name, birthday) values ('{1}','{2}')"sql= SQL_FORMAT.format(TABLE_NAME, random_string(), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

cursor.execute(sql)definsert_data(conn_args, row_size):

with get_conn(**conn_args) as conn:

with conn as c:

c.execute('use {0}'.format(DB_NAME))

with conn as c:for i inrange(row_size):

add_row(c)

conn.commit()defmain():

parser=_argparse()

conn_args= dict(host=parser.host, user=parser.user, password=parser.password, port=parser.port)

with get_conn(**conn_args) as conn:

create_db_and_table(conn)

threads=[]for i inrange(parser.thread_size):

t= threading.Thread(target=insert_data, args=(conn_args, parser.row_size))

threads.append(t)

t.start()for t inthreads:

t.join()if __name__ == '__main__':

main()

4.專家系統設計

專家系統檢查內容

1)服務器相關:包括cpu,io,內存,磁盤,網絡等方面的檢查

2)數據庫相關:包括數據庫的參數配置,主從復制性能等

3)業務相關:表結構,索引和SQL語句

索引檢查:

主鍵索引檢查,無效索引檢查,冗余索引檢查,索引區分度檢查

容量規劃:

cpu利用率檢查,io能力檢查,網絡帶寬檢查,存儲空間檢查,內存占用檢查

用戶訪問:

死鎖統計,慢日志統計

安全檢查:

弱密碼檢查,網絡檢查,權限檢查

參數檢查:

內存參數檢查,重做日志配置檢查,二進制日志檢查,連接數配置檢查

主從復制:

復制性能檢查,數據安全檢查

5.MySQL專家系統整體架構

1).作為平臺服務的MySQL數據庫健康檢查系統

2).作為數據庫工具的MySQL數據庫健康檢查系統

專家系統文件組織

# tree health_checker

health_checker

├── client

│ ├── action

│ │ ├── check_binary_logs.py

│ │ ├── check_connections.py

│ │ ├── check_redo_log.py

│ │ ├── check_safe_replication.py

│ │ └── __init__.py

│ ├── client.py

│ ├── database

│ │ ├── connection_pool.py

│ │ ├── __init__.py

│ │ └── mysql.py

│ ├── env.py

│ ├── handler.py

│ ├── __init__.py

│ ├── response.py

│ └── util.py

├── __init__.py

├── main.py

├── server

│ ├── health_checker_server.py

│ ├── __init__.py

│ ├── util.py

│ └── worker

│ ├── advise.py

│ ├── check_binary_logs.py

│ ├── check_connections.py

│ ├── check_redo_log.py

│ ├── check_safe_replication.py

│ ├── generic_worker.py

│ ├── health_checker_item.py

│ └── __init__.py

└── test.py

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的python链接mysql系统结构设计_第11章:使用Python打造MySQL专家系统的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。