day12 Python操作rabbitmq及pymsql
一、rabbitmq介紹
RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
下面將重點介紹RabbitMQ中的一些基礎概念,了解了這些概念,是使用好RabbitMQ的基礎。
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。ConnectionFactory為Connection的制造工廠。
Channel是我們與RabbitMQ打交道的最重要的一個接口,我們大部分的業務操作是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
?
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
那么,為什么使用Channel,而不是直接使用TCP連接?
??? 對于OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對于系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高并發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對于Producer或者Consumer來說,可以并發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數據可以Publish10K的數據包。當然對于不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對于普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。
?
對于一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。
Queue: Queue(隊列)是RabbitMQ的內部對象,用于存儲消息。
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息并最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息并消費。
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。
Exchange: 在上面的queue中我們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。
Binding:RabbitMQ中通過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
routing key:
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。
在Exchange Type與binding key固定的情況下(在正常使用時一般這些內容都是固定配置好的),我們的生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ為routing key設定的長度限制為255 bytes。
Binding key:
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key;消費者將消息發送給Exchange時,一般會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在
RabbitMQ常用的Exchange Type有fanout、direct、topic、headers這四種:
fanout
fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
上圖中,生產者(P)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,并最終被兩個消費者(C1與C2)消費。
direct
direct類型的Exchange路由規則也很簡單,它會把消息路由到那些binding key與routing key完全匹配的Queue中。
以上圖的配置為例,我們以routingKey=”error”發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果我們以routingKey=”info”或routingKey=”warning”來發送消息,則消息只會路由到Queue2。如果我們以其他routingKey發送消息,則消息不會路由到這兩個Queue中。
topic
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:
- routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“*”與“#”,用于做模糊匹配,其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
headers
headers類型的Exchange不依賴于routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否完全匹配Queue與Exchange綁定時指定的鍵值對;如果完全匹配則消息會路由到該Queue,否則不會路由到該Queue。
該類型的Exchange沒有用到過(不過也應該很有用武之地),所以不做介紹。
?
關于代碼示例,給大家推薦一個很棒的博客:
http://blog.csdn.net/chenjiebin/article/details/8253433
這篇博客里代碼及注釋很清晰的描述了以上的技術內容。
?
以上內容借鑒博客:
http://blog.csdn.net/whycold/article/details/41119807
http://blog.csdn.net/column/details/rabbitmq.html
感謝大神們的技術分享
?
二、pymsql
1、執行SQL#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysql# 創建連接 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') # 創建游標 cursor = conn.cursor()# 執行SQL,并返回收影響行數 effect_row = cursor.execute("update hosts set host = '1.1.1.2'")# 執行SQL,并返回受影響行數 #effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))# 執行SQL,并返回受影響行數 #effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])# 提交,不然無法保存新建或者修改的數據 conn.commit()# 關閉游標 cursor.close() # 關閉連接 conn.close()2、獲取新創建數據自增ID#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)]) conn.commit() cursor.close() conn.close()# 獲取最新自增ID new_id = cursor.lastrowid3、獲取查詢數據#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1') cursor = conn.cursor() cursor.execute("select * from hosts")# 獲取第一行數據 row_1 = cursor.fetchone()# 獲取前n行數據 # row_2 = cursor.fetchmany(3) # 獲取所有數據 # row_3 = cursor.fetchall() conn.commit() cursor.close() conn.close() 注:在fetch數據時按照順序進行,可以使用cursor.scroll(num,mode)來移動游標位置,如:cursor.scroll(1,mode='relative') # 相對當前位置移動 cursor.scroll(2,mode='absolute') # 相對絕對位置移動 4、fetch數據類型關于默認獲取的數據是元祖類型,如果想要或者字典類型的數據,即:#!/usr/bin/env python # -*- coding:utf-8 -*- import pymysqlconn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')# 游標設置為字典類型 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) r = cursor.execute("call p1()")result = cursor.fetchone()conn.commit() cursor.close() conn.close()轉載于:https://www.cnblogs.com/xuanouba/p/5718371.html
總結
以上是生活随笔為你收集整理的day12 Python操作rabbitmq及pymsql的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android Canvas 绘图
- 下一篇: 枚举,二分