python123平台作业答案第十二周_python周报第十二周
0.本周知識(shí)點(diǎn)預(yù)覽
Contextlib
Redis發(fā)布訂閱
RabbitMQ
pymysql
SQLAchemy
1.Contextlib模塊
contextlib模塊的contextmanager 可以實(shí)現(xiàn)用with來管理上下文,類似于 with open('test.txt','r') as f,這樣打開文件操作后就可以自動(dòng)關(guān)閉文件。
1.范例一(自定義函數(shù)):
###contextlib (實(shí)現(xiàn)with上下文管理)
importcontextlib
list1= [1,2,3]
str1= "lk"###此處必須是這個(gè)裝飾器
@contextlib.contextmanagerdeffunc(l, s):
l.append(s)try:###執(zhí)行到y(tǒng)ield時(shí),中斷跳出函數(shù)
yield
finally:print(l)
with func(list1, str1):print(123)print(456)
執(zhí)行結(jié)果如下:
123
456[1, 2, 3, 'lk']
代碼解析:以上代碼的執(zhí)行順序?yàn)?#xff1a;
1.加載list1,str1,contextlib.contextmanager裝飾器
2.執(zhí)行with func(list1, str1)
3.執(zhí)行def func(l, s), l.append(s)
4.try,yield,跳出函數(shù)回到with func(list1, str1) 執(zhí)行print
5.執(zhí)行完print后回到def func中的yield處,繼續(xù)往下執(zhí)行
2.范例二(socket):
###利用上下文可以處理文件那樣處理類似socket,自動(dòng)關(guān)閉連接
importcontextlibimportsocket
@contextlib.contextmanagerdefbase_socket(host, port):
sk=socket.socket()
sk.bind((host, port))
sk.listen(5)print(123)try:yieldskfinally:print(789)
sk.close()
with base_socket("127.0.0.1", 8888) as sock:print(456)
執(zhí)行結(jié)果如下:
123
456
789
代碼解析:這個(gè)是context對(duì)socket的應(yīng)用,在項(xiàng)目中就可以這么寫。執(zhí)行順序和上個(gè)例子相同。
2.Redis 發(fā)布訂閱
自定義redis基礎(chǔ)類:
## redis 發(fā)布訂閱
importredisclassRedisHelper:def __init__(self):###創(chuàng)建redis連接對(duì)象
self.__conn = redis.Redis(host="127.0.0.1", port=6379)defpublic(self, msg, chan):###publish 方法,把信息發(fā)布到頻道上,返回消息被傳遞的訂閱者的數(shù)量
self.__conn.publish(chan, msg)returnTruedefsubscribe(self, chan):###pubsub 方法的意思是,返回一個(gè)發(fā)布或者訂閱的對(duì)象,用這個(gè)對(duì)象,你就能訂閱這個(gè)頻道,監(jiān)聽給發(fā)給這些頻道的消息
pub = self.__conn.pubsub()###subscribe 方法: 訂閱頻道
pub.subscribe(chan)###parse_response 方法:解析從發(fā)布者/訂閱者命令的響應(yīng)
pub.parse_response()return pub
發(fā)布者代碼:
importtest
fabu=test.RedisHelper()whileTrue:
inp= input(">>>")if inp == "exit":break
else:
fabu.public("%s" % inp, "998")
訂閱者代碼:
importtest
dingyue=test.RedisHelper()whileTrue:###subscribe 方法 -> 訂閱頻道
data = dingyue.subscribe("998")###parse_response 方法:解析從發(fā)布者/訂閱者命令的響應(yīng)
print(data.parse_response())
執(zhí)行結(jié)果:
1、先執(zhí)行訂閱者代碼,這時(shí)會(huì)卡在這里,等待接收消息。
2、后執(zhí)行發(fā)布者代碼,這時(shí)當(dāng)發(fā)布消息時(shí),訂閱者就會(huì)收到消息。
3.RabbitMQ
1.未利用exchange
生產(chǎn)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))
channel=connection.channel()###創(chuàng)建一個(gè)隊(duì)列,假如隊(duì)列不存在,但是為了確保隊(duì)列成功創(chuàng)建,C/P兩端最好都創(chuàng)建隊(duì)列###durable=True 消息持久化
channel.queue_declare("hello_lk3", durable=True)###在Rabbitmq中,一個(gè)消息不能直接發(fā)送給queue, 需要經(jīng)過一個(gè)exchange,后續(xù)會(huì)講到 ,現(xiàn)在我們只需將exchange設(shè)置為空字符串
channel.basic_publish(exchange='', routing_key="hello_lk1", body="fuck", properties=pika.BasicProperties(delivery_mode=2,))print("[x] sent 'fuck'")
connection.close()
消費(fèi)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創(chuàng)建一個(gè)頻道
channel =connection.channel()###創(chuàng)建一個(gè)隊(duì)列,假如隊(duì)列不存在,但是為了確保隊(duì)列成功創(chuàng)建,C/P兩端最好都創(chuàng)建隊(duì)列###durable=True 消息持久化
channel.queue_declare("hello_lk3", durable=True)###函數(shù)名不必須叫callback,callback函數(shù)就是將接收到的消息打印在屏幕上
defcallback(ch, mechod, properties, body):print("[%s] is received" %body)###無限循環(huán)監(jiān)聽,調(diào)用callback,隊(duì)列名,no_ack的含義為,當(dāng)時(shí)True時(shí),只要訂閱到消息,立刻返回ack,這是TCP層面的,并不能確保消息成功消費(fèi)###假如no_ack為False時(shí),訂閱到消息后要處理成功后才返回ack,這是業(yè)務(wù)邏輯層面的,確保消費(fèi)者成功消費(fèi)消息
channel.basic_consume(callback, queue="hello_lk1", no_ack=True)print("現(xiàn)在開始消費(fèi)消息...")
channel.start_consuming()
代碼執(zhí)行結(jié)果:
生產(chǎn)者:
[x] sent 'fuck'Process finished with exit code 0
消費(fèi)者:
現(xiàn)在開始消費(fèi)消息...
[b'fuck'] isreceived
[b'fuck'] isreceived
[b'fuck'] isreceived
[b'fuck'] is received
代碼解析:
1.先執(zhí)行消費(fèi)者代碼,在執(zhí)行生產(chǎn)者代碼,可以看到如上圖所示結(jié)果。
2.執(zhí)行兩次消費(fèi)者代碼,會(huì)發(fā)現(xiàn)生產(chǎn)者每生產(chǎn)個(gè)消息,消費(fèi)者會(huì)輪訓(xùn)的來消費(fèi)。
3.在步驟2中,假如不想讓消費(fèi)者輪訓(xùn)消費(fèi)而是先來先得的消費(fèi),則需要在消費(fèi)者代碼中加入一行:channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數(shù)排列。
2.使用exchange發(fā)布訂閱(常用)
1.fanout exchange
這是處理邏輯最簡單的exchange類型,實(shí)際上它沒有任何邏輯,它把進(jìn)入該exchange的消息全部轉(zhuǎn)發(fā)給每一個(gè)綁定的隊(duì)列中,如果這個(gè)exchange沒有隊(duì)列與之綁定,消息會(huì)被丟棄。然后通過exchange發(fā)送消息,routing key可以隨便填寫,因?yàn)槭莊anout類型的exchange,routing key不起作用。
生產(chǎn)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))
channel=connection.channel()###創(chuàng)建一個(gè)exchange,生產(chǎn)者直接向exchange發(fā)消息,而不是隊(duì)列.###type: fanout類型:把進(jìn)入該exchange的消息全部轉(zhuǎn)發(fā)給每一個(gè)綁定的隊(duì)列中,如果這個(gè)exchange沒有隊(duì)列與之綁定,消息會(huì)被丟棄
channel.exchange_declare(exchange="lk", type="fanout")
message= 'hello'channel.basic_publish(exchange='lk', routing_key='', body=message)print("send MSG: %s" %message)
connection.close()
消費(fèi)者1代碼:
##exchange 版
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創(chuàng)建一個(gè)頻道
channel =connection.channel()###創(chuàng)建一個(gè)exchange,假如不存在,但是為了確保其成功創(chuàng)建,C/P兩端最好都創(chuàng)建exchange###lk 為exchange名###type: fanout類型:把進(jìn)入該exchange的消息全部轉(zhuǎn)發(fā)給每一個(gè)綁定的隊(duì)列中,如果這個(gè)exchange沒有隊(duì)列與之綁定,消息會(huì)被丟棄
channel.exchange_declare(exchange="lk", type='fanout')##隨機(jī)創(chuàng)建隊(duì)列#result = channel.queue_declare(exclusive=True)#queue_name = result.method.queue###指定創(chuàng)建隊(duì)列
channel.queue_declare("hello_lk5")##綁定隊(duì)列到exchange
channel.queue_bind(exchange="lk", queue="hello_lk5")###函數(shù)名不必須叫callback,callback函數(shù)就是將接收到的消息打印在屏幕上
defcallback1(ch, method, propreties, body):print("[x] 收到 %s" %body)
channel.basic_consume(callback1, queue="hello_lk5", no_ack=True)
channel.start_consuming()
消費(fèi)者2代碼:
##exchange 版
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1"))###創(chuàng)建一個(gè)頻道
channel =connection.channel()###創(chuàng)建一個(gè)exchange,假如不存在,但是為了確保其成功創(chuàng)建,C/P兩端最好都創(chuàng)建exchange###lk 為exchange名###type: fanout類型:把進(jìn)入該exchange的消息全部轉(zhuǎn)發(fā)給每一個(gè)綁定的隊(duì)列中,如果這個(gè)exchange沒有隊(duì)列與之綁定,消息會(huì)被丟棄
channel.exchange_declare(exchange="lk", type='fanout')##隨機(jī)創(chuàng)建隊(duì)列#result = channel.queue_declare(exclusive=True)#queue_name = result.method.queue###指定創(chuàng)建隊(duì)列
channel.queue_declare("hello_lk6")##綁定隊(duì)列到exchange
channel.queue_bind(exchange="lk", queue="hello_lk6")###函數(shù)名必須叫callback,callback函數(shù)就是將接收到的消息打印在屏幕上
defcallback1(ch, method, propreties, body):print("[x] 收到 %s" %body)
channel.basic_consume(callback1, queue="hello_lk6", no_ack=True)
channel.start_consuming()
執(zhí)行結(jié)果:
生產(chǎn)者:
send MSG: hello
Process finished with exit code 0
消費(fèi)者1:
[x] 收到 b'hello'
消費(fèi)者2:
[x] 收到 b'hello'
代碼解析:生產(chǎn)者直接向exchange發(fā)消息,這時(shí),消費(fèi)者創(chuàng)建并綁定隊(duì)列到exchange上,生產(chǎn)者一旦發(fā)布,所有隊(duì)列都會(huì)收到消息。
2.direct exchange
這種類型的交換機(jī)Fancout 類型的交換機(jī)智能一些,它會(huì)根據(jù)routing key來決定把消息具體扔到哪個(gè)消息隊(duì)列中。通過exchange發(fā)消息的時(shí)候會(huì)指定一個(gè)routing key,只有當(dāng)routing key和與隊(duì)列綁定的routing key一樣的時(shí)候,消息才對(duì)發(fā)送到對(duì)應(yīng)的消息隊(duì)列。即,如果與某個(gè)隊(duì)列綁定的routing key叫hello.world,則通過exchange發(fā)送的routing key必須也是hello.world,該隊(duì)列才能接收到消息(可按上述步驟進(jìn)行驗(yàn)證)。這種情況下,隊(duì)列之間是互斥關(guān)系,一個(gè)消息最多只能進(jìn)入一個(gè)隊(duì)列。
生產(chǎn)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創(chuàng)建一個(gè)exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###定義關(guān)鍵字severity, 每次發(fā)消息時(shí)會(huì)指定關(guān)鍵字.
severity = "error"
###message 是要發(fā)送的消息
message = "123"
###通過定義好的exchange發(fā)送消息,關(guān)鍵字也是定好的,發(fā)送指定的消息
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)print("[x] Sent %r:%r" %(severity, message))
connection.close()
消費(fèi)者1代碼:
importpika#import sys
###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))###創(chuàng)建一個(gè)頻道
channel =connection.channel()###創(chuàng)建一個(gè)exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###隨機(jī)創(chuàng)建一個(gè)隊(duì)列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###關(guān)聯(lián)關(guān)鍵字,只要生產(chǎn)者發(fā)布的消息關(guān)聯(lián)了以下關(guān)鍵字,訂閱者便能在綁定的隊(duì)列中收到消息
severities = ["info", "waring", "error"]###綁定關(guān)鍵字,隊(duì)列到exchange.
for severity inseverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)print('[*] Waiting for logs. To exit press CTRL+C')###打印訂閱到的消息
defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))###循環(huán)監(jiān)聽隊(duì)列
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
消費(fèi)者2代碼:
importpika#import sys
###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))###創(chuàng)建一個(gè)頻道
channel =connection.channel()###創(chuàng)建一個(gè)exchange, 類型是direct,
channel.exchange_declare(exchange='direct_logs',
type='direct')###隨機(jī)創(chuàng)建一個(gè)隊(duì)列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###關(guān)聯(lián)關(guān)鍵字,只要生產(chǎn)者發(fā)布的消息關(guān)聯(lián)了以下關(guān)鍵字,訂閱者便能在綁定的隊(duì)列中收到消息
severities = ["error"]###綁定關(guān)鍵字,隊(duì)列到exchange.
for severity inseverities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)print('[*] Waiting for logs. To exit press CTRL+C')###打印訂閱到的消息
defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))###循環(huán)監(jiān)聽隊(duì)列
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執(zhí)行結(jié)果:
1.首先執(zhí)行消費(fèi)者1和消費(fèi)者2的代碼
2.然后執(zhí)行生產(chǎn)者的代碼
3.可以看到消費(fèi)者1、2都接收到了生產(chǎn)者的消息。
4.假如生產(chǎn)者發(fā)送帶有非"error"關(guān)鍵字的消息,則只有消費(fèi)者1才能收到。
消費(fèi)者1執(zhí)行結(jié)果:
[*] Waiting for logs. To exit press CTRL+C
[x]'error':b'123'[x]'info':b'123'
消費(fèi)者2執(zhí)行結(jié)果:
[*] Waiting for logs. To exit press CTRL+C
[x]'error':b'123'
3.Topic exchange
Topic exchange是最靈活的exchange,它會(huì)把exchange的routing key與綁定隊(duì)列的routing key進(jìn)行模式匹配。Routing key中可以包含 和#兩種符號(hào),#號(hào)可以用來匹配一個(gè)或者多個(gè)單詞,*用來匹配正好一個(gè)單詞。
生產(chǎn)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創(chuàng)建一個(gè)exchange, 類型是topic
channel.exchange_declare(exchange='topic_logs',
type='topic')###發(fā)布者綁定exchange的關(guān)鍵字,訂閱者根據(jù)模糊匹配來訂閱
routing_key = "lk.haha.python"message= "xxoo"channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)print("[x] Sent %r:%r" %(routing_key, message))
connection.close()
消費(fèi)者代碼:
importpika###第一件事,我們要做的就是與rabbitmq-server建立連接
connection =pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel=connection.channel()###創(chuàng)建一個(gè)exchange, 類型是topic
channel.exchange_declare(exchange='topic_logs',
type='topic')###隨機(jī)創(chuàng)建一個(gè)隊(duì)列
result = channel.queue_declare(exclusive=True)
queue_name=result.method.queue###訂閱者綁定在exchange以及隊(duì)列的關(guān)鍵字模糊匹配,這里#代表0個(gè)或多個(gè)單詞,* 代表一個(gè)單詞,假如只寫一個(gè)#,代表全部匹配.
binding_keys = ["lk.#"]###根據(jù)多種匹配來綁定
for binding_key inbinding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)print('[*] Waiting for logs. To exit press CTRL+C')defcallback(ch, method, properties, body):print("[x] %r:%r" %(method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執(zhí)行結(jié)果:
1.先執(zhí)行消費(fèi)者代碼,后執(zhí)行生產(chǎn)者代碼。
2.因?yàn)樯a(chǎn)者發(fā)送的消息帶有關(guān)鍵字lk.haha.python,符合訂閱者的綁定邏輯lk.#,所以這個(gè)消費(fèi)者能收到消息。
3.這個(gè)用法很方便,可以通過匹配來進(jìn)行消息的選擇接收。
4.Python的SQLAchemy框架
1.MySQL 基礎(chǔ) ----> 請(qǐng)自行百度
2.SQLAchemy
1.pymysql
importpymysql###創(chuàng)建一個(gè)MySQL 連接對(duì)象
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='s13')###創(chuàng)建一個(gè)可以操作MySQL的游標(biāo),默認(rèn)獲取結(jié)果是元組,當(dāng)設(shè)置cursor=pymysql.cursors.DictCursor,后獲取結(jié)果為字典
cursor =conn.cursor()#cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
###執(zhí)行SQL語句
cursor.execute("select * from t10")#獲取第一行數(shù)據(jù)#row_1 = cursor.fetchone()
#獲取前n行數(shù)據(jù)#row_2 = cursor.fetchmany(3)#獲取所有數(shù)據(jù)
row_3 =cursor.fetchall()print(row_3)###mode='relative',獲取結(jié)果相對(duì)位置移動(dòng) mode= 'absolute',獲取結(jié)果絕對(duì)位置移動(dòng)#cursor.scroll(-2,mode='relative')#row_3 = cursor.fetchall()#print(row_3)
###提交操作,當(dāng)執(zhí)行如insert update alter delete drop 等操作后要提交才能生效
conn.commit()###關(guān)閉游標(biāo)
cursor.close()###關(guān)閉數(shù)據(jù)庫連接
conn.close()
執(zhí)行結(jié)果:就是數(shù)據(jù)庫的操作結(jié)果,不過,庫和表都是事先從終端創(chuàng)建好的。
2.SQLAchemy基本操作
from sqlalchemy.ext.declarative importdeclarative_basefrom sqlalchemy importColumn, Integer, String, ForeignKey, UniqueConstraint, Indexfrom sqlalchemy.orm importsessionmaker, relationshipfrom sqlalchemy importcreate_engine###創(chuàng)建一個(gè)數(shù)據(jù)庫連接,連接池為5個(gè)
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)###創(chuàng)建對(duì)象的基類,默認(rèn)就這么寫.
Base =declarative_base()#定義個(gè)User子類
classUsers(Base):###要?jiǎng)?chuàng)建的表名
__tablename__ = 'users'
###表的結(jié)構(gòu)
id = Column(Integer, primary_key=True)
name= Column(String(32))
extra= Column(String(16))__table_args__ =(
UniqueConstraint('id', 'name', name='uix_id_name'),
Index('ix_id_name', 'name', 'extra'),
)#一對(duì)多
classFavor(Base):__tablename__ = 'favor'nid= Column(Integer, primary_key=True)
caption= Column(String(50), default='red', unique=True)classPerson(Base):__tablename__ = 'person'nid= Column(Integer, primary_key=True)
name= Column(String(32), index=True, nullable=True)
favor_id= Column(Integer, ForeignKey("favor.nid"))#多對(duì)多
classServerToGroup(Base):__tablename__ = 'servertogroup'nid= Column(Integer, primary_key=True, autoincrement=True)
server_id= Column(Integer, ForeignKey('server.id'))
group_id= Column(Integer, ForeignKey('group.id'))classGroup(Base):__tablename__ = 'group'id= Column(Integer, primary_key=True)
name= Column(String(64), unique=True, nullable=False)classServer(Base):__tablename__ = 'server'id= Column(Integer, primary_key=True, autoincrement=True)
hostname= Column(String(64), unique=True, nullable=False)
port= Column(Integer, default=22)###執(zhí)行建表操作(create_all),刪表操作(drop_all)
Base.metadata.create_all(engine)#Base.metadata.drop_all(engine)
執(zhí)行結(jié)果如下:
mysql>show tables;+---------------+
| Tables_in_s13 |
+---------------+
| favor |
| group |
| person |
| server |
| servertogroup |
| t10 |
| users |
+---------------+
7 rows in set (0.01 sec)
3.SQLAchemy 增刪改查
1.利用數(shù)據(jù)庫連接直接SQL語句執(zhí)行
fromsqlalchemy import create_engine
###創(chuàng)建一個(gè)數(shù)據(jù)庫連接,連接池為5個(gè)
engine= create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5)
engine.execute(
"INSERT INTO users (id, name, extra) VALUES (1, 'lk', 'haha')"
)
result= engine.execute('select * from users')print(result.fetchall())
執(zhí)行結(jié)果:
[(1, 'lk', 'haha')]
2.利用SQLAchemy內(nèi)部組件操作
在利用SQLAchemy的子類繼承模式創(chuàng)建表之后,創(chuàng)建對(duì)象,利用對(duì)象執(zhí)行特定語句。
增:
obj = Users(name="alex0", extra='sb')
session.add(obj)
session.add_all([Users(name="alex1", extra='sb'),
Users(name="alex2", extra='sb'),])
session.commit()
刪:
session.query(Users).filter(Users.id > 2).delete()
session.commit()
改:
session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id> 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id> 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()
查:
ret = session.query(Users).all()
ret= session.query(Users.name, Users.extra).all()
ret= session.query(Users).filter_by(name='alex').all()
ret= session.query(Users).filter_by(name='alex').first()
其他:
# 條件
ret= session.query(Users).filter_by(name='alex').all()
ret= session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret= session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret= session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret= session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret= session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()fromsqlalchemy import and_, or_
ret= session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret= session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret=session.query(Users).filter(
or_(
Users.id< 2,
and_(Users.name== 'eric', Users.id > 3),
Users.extra!=""
)).all()
# 通配符
ret= session.query(Users).filter(Users.name.like('e%')).all()
ret= session.query(Users).filter(~Users.name.like('e%')).all()
# 限制
ret= session.query(Users)[1:2]# 排序
ret= session.query(Users).order_by(Users.name.desc()).all()
ret= session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分組fromsqlalchemy.sql import func
ret= session.query(Users).group_by(Users.extra).all()
ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 連表
ret= session.query(Users, Favor).filter(Users.id == Favor.nid).all()
ret= session.query(Person).join(Favor).all()
ret= session.query(Person).join(Favor, isouter=True).all()
# 組合
q1= session.query(Users.name).filter(Users.id > 2)
q2= session.query(Favor.caption).filter(Favor.nid < 2)
ret= q1.union(q2).all()
q1= session.query(Users.name).filter(Users.id > 2)
q2= session.query(Favor.caption).filter(Favor.nid < 2)
ret= q1.union_all(q2).all()
總結(jié)
以上是生活随笔為你收集整理的python123平台作业答案第十二周_python周报第十二周的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 腾讯地图利用绘图工具测量距离
- 下一篇: python库读取cif文件_技术专栏: