日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

python mysql操作封装库_python封装mysq操作,进行数据库的增删改

發布時間:2024/7/5 数据库 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python mysql操作封装库_python封装mysq操作,进行数据库的增删改 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

python操作mysql進行封裝,封裝的好處我就不提了,以下是我做項目時的一個封裝,大家可以根據實際需要進行自己的一個封裝

我封裝的內容:

1.數據庫的配置文件

2.獲取數據配置文件的地址

3.連接數據庫的操作

4.操作mysql的語句

5.調用mysql的語句--執行

封裝1:數據庫配置文件

config中db_conf.ini

里面主要是一些mysql的配置文件

[mysqlconf]

host=127.0.0.1

port=3306

user=root

password=123456

db_name=數據庫名

封裝2:讀取配置文件地址public_data.py

import os

# 整個項目的根目錄絕對路勁

baseDir = os.path.dirname(os.path.dirname(__file__))

# 數據庫配置你文件絕對路徑

config_path = baseDir + "/config/db_config.ini"

print(config_path)

打印目錄地址:

封裝3:數據庫的連接 config_handler.py

# -*- coding:utf-8 -*-

#@Time : 2020/6/14 11:01

#@Author: 張君

#@File : config_handler.py

from configparser import ConfigParser

from config.public_data import config_path

class ConfigParse(object):

def __init__(self):

pass

@classmethod

def get_db_config(cls):

#cls使用的類方法,cls就是指定本身

cls.cfp = ConfigParser()

cls.cfp.read(config_path)

host = cls.cfp.get("mysqlconf", "host")

port = cls.cfp.get("mysqlconf", "port")

user = cls.cfp.get("mysqlconf", "user")

password = cls.cfp.get("mysqlconf", "password")

db = cls.cfp.get("mysqlconf", "db_name")

return {"host":host, "port":port, "user":user, "password":password,"db":db}

if __name__ == '__main__':

cp=ConfigParse()

result=cp.get_db_config()

print(result)

封裝4:sql語句的封裝 db_handle.py

import datetime

import pymysql

from utils.config_handler import ConfigParse

class DB(object):

def __init__(self):

#獲取mysql連接信息

self.db_conf = ConfigParse.get_db_config()

#獲取連接對象

self.conn = pymysql.connect(

host = self.db_conf["host"],

port = int(self.db_conf["port"]),

user = self.db_conf["user"],

password = self.db_conf["password"],

database = self.db_conf["db"],

charset = "utf8"

)

#獲取數據的游標

self.cur = self.conn.cursor()

def close_connect(self):

# 關閉數據連接

#提交,物理存儲

self.conn.commit()

#游標關閉

self.cur.close()

#連接對象關閉

self.conn.close()

def get_api_list(self):

"""獲取所有的對象數據"""

sqlStr = "select * from interface_api where status=1"

self.cur.execute(sqlStr)

data = self.cur.fetchall()

#轉成一個list

apiList = list(data)

return apiList

def get_api_case(self, api_id):

"""獲取某一條數據"""

sqlStr = "select * from interface_test_case where api_id=%s and status=1" %api_id

self.cur.execute(sqlStr)

api_case_list = list(self.cur.fetchall())

return api_case_list

def get_rely_data(self, api_id, case_id):

"""獲取所有數據庫中的某一條"""

sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" %(api_id, case_id)

self.cur.execute(sqlStr)

rely_data = eval(self.cur.fetchall()[0][0])

return rely_data

def write_check_result(self, case_id, errorInfo, res_data):

"""更新數據庫表"""

sqlStr = "update interface_test_case set error_info=\"%s\", res_data=\"%s\" where id=%s" %(errorInfo, res_data, case_id)

self.cur.execute(sqlStr)

self.conn.commit()

def insert_dab(self):

sqlStr="INSERT INTO `interface_api` VALUES (4, '修改博文', 'http://39.106.41.11:8080/getBlogContent/', 'get', 'url','0', '2018-07-27 22:13:30')"

self.cur.execute(sqlStr)

def get_api_case(self, api_id):

"""獲取表中id"""

sqlStr = "select * from interface_test_case where api_id=%s and status=1" % api_id

self.cur.execute(sqlStr)

api_case_list = list(self.cur.fetchall())

return api_case_list

def get_api_id(self, api_name):

"""獲取表中id"""

sqlStr = "select api_id from interface_api where api_name='%s'" % api_name

self.cur.execute(sqlStr)

api_id = self.cur.fetchall()[0][0]

return api_id

def update_store_data(self, api_id, case_id, store_data):

sqlStr = "select data_store from interface_data_store where api_id=%s and case_id=%s" % (api_id, case_id)

self.cur.execute(sqlStr)

if self.cur.fetchall():

sqlStr = "update interface_data_store set data_store=\"%s\" where api_id=%s and case_id=%s" % (

store_data, api_id, case_id)

print(sqlStr)

self.cur.execute(sqlStr)

self.conn.commit()

else:

sqlStr = "insert into interface_data_store values(%s, %s, \"%s\", '%s')" % (

api_id, case_id, store_data, datetime.now())

self.cur.execute(sqlStr)

self.conn.commit()

if __name__ == '__main__':

db=DB()

print(db.get_api_list())

print(db.get_api_case(1))

# print(db.get_rely_data(1,1))

#print(db.insert_dab())

使用ing,我新建了一個test.py文件

from utils.db_handler import DB

from action.get_rely import GetRely

from utils.HttpClient import HttpClient

def main():

#連接數據庫,獲取連接實例對象

db=DB()

#從數據庫中獲取需要執行的api執行集合

api_list=db.get_api_list()

print(api_list)

main()

至此一整套封裝,與使用就完成了,為什么要分為幾個文件夾,我遵循的一個原則, 使用、配置、公共方法是用不的文件夾整理,其實最終也就是你使用怎么方便,維護比較好,就怎么用

總結

以上是生活随笔為你收集整理的python mysql操作封装库_python封装mysq操作,进行数据库的增删改的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。