python多线程详解_Python多线程详解-Python-火龙果软件
編輯推薦:
本文來自于博客,本文詳細介紹了如何使用Python操作MySQL、使用Python操作Redis及項目實戰。
操作MySQL
1)Windows中安裝python和pycharm
2)ubuntu中安裝python和pycharm
安裝步驟不做贅述,pycharm運行腳本
#!/usr/bin/env python
import MySQLdb
#get connection
try:
con = MySQLdb.connect(
host='localhost',
user='root',
passwd='12346',
port=3308,
db='sakila',
charset='utf8'
)
except MySQLdb.Error as e:
print('error:%s'% e)
cursor = con.cursor()
cursor.execute('SELECT * FROM `store`')
rest = cursor.fetchone()
print(rest)
#close connection
con.close()
3)查詢數據庫
#!/usr/bin/env python
import MySQLdb
class MysqlQuery(object):
def __init__(self):
self.get_conn()
def get_conn(self):
# get connection
try:
self.conn = MySQLdb.connect(
host='localhost',
user='root',
passwd='123456',
port=3308,
db='sakila',
charset='utf8'
)
except MySQLdb.Error as e:
print('error:%s' % e)
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM `store`')
rest = cursor.fetchone()
print(rest)
def close_conn(self):
try:
if self.conn:
# close connection
self.conn.close()
except MySQLdb.Error as e:
print('Error:%s'%e)
def get_one(self):
#prepare SQL
/*雖然定義類型為int,可使用string*/
sql='SELECT * FROM `store` where `store_id` =%s'
#get cursor
cursor=self.conn.cursor()
cursor.execute(sql,('1',))
print(cursor.rowcount)
rest=dict(zip([k[0] for k in cursor.description],cursor.fetchone()))
print(rest)
print(rest['store_id'])
self.close_conn()
return rest
def main():
obj = MysqlQuery()
rest = obj.get_one();
print(rest['store_id'])
if __name__ == '__main__':
main()
/*取走所有數據,形成數組*/
rest = [dict(zip([k[0] for k in cursor.description],row))for row in cursor.fetchall()]
zip([iterable, …])
Python的一個內建函數,它接受一系列可迭代的對象作為參數,將對象中對應的元素打包成一個個tuple(元組),然后返回由這些tuples組成的list(列表)。若傳入參數的長度不等,則返回list的長度和參數中長度最短的對象相同。利用*號操作符,可以將list unzip(解壓)。
dict()作用:dict() 函數用于創建一個字典。返回一個字典。
class dict(**kwarg)
class dict(mapping, **kwarg)
class dict(iterable, **kwarg)
/*
kwargs -- 關鍵字
mapping -- 元素的容器。
iterable -- 可迭代對象
*/
4)更新數據
def add_one(self):
row_count=0
try:
sql = ("insert into `film`(`title`,`description`,`language_id`) value" "(%s,%s,%s);")
cursor = self.conn.cursor()
cursor.execute(sql, ('chia', 'ashajhsjah','1'))
self.conn.commit()
except:
print('error')
self.conn.rollback()
row_count=cursor.rowcount
cursor.close()
self.close_conn()
return row_count
5)ORM:SQLAlChemy
pip install SQLAlchemy
import sqlalchemy
declarative_base() 創建了一個 BaseModel 類,這個類的子類可以自動與一個表關聯。
增刪改查
#!/usr/bin/python
#coding=utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime, Boolean
import datetime
#engine=create_engine('數據庫類型://用戶名:密碼@ip:端口/數據庫名')
engine=create_engine('mysql://root:123456@localhost:3308/sakila')
Base=declarative_base()
Session = sessionmaker(bind=engine)
class Store(Base):
__tablename__='country'#數據庫表名
country_id = Column(Integer, primary_key=True)
country = Column(String(200), nullable=False)
last_update = Column(String(2000), nullable=False)
class MysqlOrmTest(object):
def __init__(self):
self.session=Session()
def add_one(self):
new_obj=Store(
country_id='130',
country='hhhsahsa',
last_update=datetime.datetime.now()#此處需要import datetime
)
self.session.add(new_obj)
self.session.commit()
return new_obj
def get_one(self):
return self.session.query(Store).get(1)
def update_date(self):
obj=self.session.query(Store).get(1)
obj.manager_staff_id=1
self.session.add(obj)
self.session.commit()
return obj
def delete_data(self):
data=self.session.query(Store).get(3)
self.session.delete(data)
self.session.commit()
def main():
# rest = obj.add_one()
# print(dir(rest))
# print(obj.get_one().title)
# print(obj.get_more().count())
# for row in obj.get_more():
# print(row.title)
# print(obj.update_data())
if __name__ == '__main__':
main()
6)項目實戰
使用pycharm專業版,選擇flask框架,代碼如下:
from flask import Flask
app = Flask(__name__)
@app.route('/hello')
def hello_world():
return 'Hello World!hello'
if __name__ == '__main__':
app.run(debug=True)
##flask支持熱部署
簡單搭建flask架構網站
本人使用pycharm開發flask項目,可以利用工具導入工具包:
##引入相關包
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, render_template, flash, redirect, url_for, abort, request
app = Flask(__name__)
##配置數據庫
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:123456@localhost:3308/flaskdb'
db=SQLAlchemy(app)
##完成ORM映射
class News(db.Model):
__tablename__='news'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
img_url = db.Column(db.String(2000), nullable=False)
content = db.Column(db.String(2000), nullable=True)
is_valid = db.Column(db.String(2000), nullable=True)
created_at = db.Column(db.String(2000), nullable=True)
updated_at = db.Column(db.String(2000), nullable=True)
news_type = db.Column(db.String(2000), nullable=True)
def __repr__(self):
return '' % self.title
##python生成HTML效果不好
@app.route('/hello')
def hello_world():
return 'Hello World!hello'
##使用渲染引擎Jinja2
@app.route('/')
def index():
news_list=News.query.all()
return render_template("index.html",news_list=news_list)
@app.route('/cat//')
def cat(name):
news_list=News.query.filter(News.news_type==name)
return render_template('cat.html',new_list=news_list)
@app.route('/detail//')
def detail(pk):
new_obj=News.query.get(pk)
return render_template('detail.html',new_obj=new_obj)
@app.route('/admin/')
@app.route('/admin//')
def admin(page=None):
return render_template("admin/index.html")
@app.route('/admin/add/', methods=['GET', 'POST'])
def add():
return render_template("admin/add.html")
@app.route('/admin/update//', methods=['GET', 'POST'])
def update(pk):
return render_template("admin/update.html")
@app.route('/admin/delete//', methods=['POST'])
def delete(pk):
return 'no'
if __name__ == '__main__':
app.run(debug=True)
操作Redis
1) Redis安裝
sudo apt-get update
sudo apt-get install redis-server
##啟動Redis服務器
redis-server
##查看 redis 是否啟動?
redis-cli
2)Redis命令
Set animal 'cat'
get animal
##添加value
append animal 'dog'
mset user1 'chu' user2 'yao'
mget user1 user2
set num 9
incr/decr num /*增加減少1*/
set user:chuyao;age:45 'asasasasa'
列表(list)相關操作
lpush/rpush q1 'chu' 'yao' 'Amy'/*從左、右插入數據*/
lrange/*獲取指定長度的數據*/
ltrim/*截取一定長度的數據*/
lpop/rpop/*移除最左、右的元素并返回*/
lpushx/rpushx --key/* key存在時候才插入數據,不存在時不做任何處理*/
集合(Set)相關操作
sadd/srem /*添加、刪除元素*/
sismember /*判斷是否為set的一個元素*/
smembers /*返回該集合的所有成員*/
sdiff /*返回一個集合與其他集合的差異*/
sinter/*返回幾個集合的交集*/
sunion/*返回幾個集合的并集*/
散列(hash)相關操作
3)redis-py連接
import redis
r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)
user1=r.get('user1')
print(user1)
注意,如果是遠程連接數據庫,需要修改Redis配置文件。
1)注釋掉bind 127.0.0.1可以使所有的ip訪問redis。
2)修改辦法:protected-mode no
4)Python 操作String類型
import redis
class TestString(object):
def __init__(self):
self.r=redis.StrictRedis(host='120.95.132.174',port=6379,db=0)
def test_set(self):
rest=self.r.set('user2','Amy');
print(rest)
def test_get(self):
rest=self.r.get('user1')
print rest
return rest
def test_mset(self):
d={
'user3':'Bob',
'user4':'BobX'
}
rest=self.r.mset(d)
print(rest)
return rest
def test_mget(self):
l=['user1','user2']
rest=self.r.mget(l)
print(rest)
return rest
def test_del(self):
rest=self.r.delete('user1')
print (rest)
def main():
str_obj=TestString();
# str_obj.test_set();
str_obj.test_get();
# str_obj.test_mset();
# str_obj.test_mget();
# str_obj.test_del();
if __name__=='__main__':
main()
5)項目實戰
新聞數據,Hash
新聞ID,String
分頁數據,List
排序,Sorted Set
總結
以上是生活随笔為你收集整理的python多线程详解_Python多线程详解-Python-火龙果软件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 共话新科技新商业,2017全球虚拟现实产
- 下一篇: 三维点云数据处理软件供技术原理说明_三维