[RabbitMQ]RabbitMQ原理与相关操作(一)
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
RabbitMQ原理與相關(guān)操作(一)
小編是菜鳥一枚,最近想試試MQ相關(guān)的技術(shù),所以自己看了下RabbitMQ官網(wǎng),試著寫下自己的理解與操作的過程。
剛開始的第一篇,原理只介紹 生產(chǎn)者、消費(fèi)者、隊(duì)列,至于其他的內(nèi)容,會(huì)在后續(xù)中陸續(xù)補(bǔ)齊。
引入MQ話題
什么時(shí)候會(huì)用到MQ
可能很多人有疑惑:MQ到底是什么?哪些場(chǎng)景下要使用MQ?
前段時(shí)間安裝了RabbitMQ,現(xiàn)在就記錄下自己的學(xué)習(xí)心得吧。
首先看段程序:
View Code
僅僅從代碼上看,沒有覺得任何問題對(duì)吧?編譯也是通過的,但是執(zhí)行時(shí),出現(xiàn)一個(gè)問題:
當(dāng)然,這僅僅是一個(gè)小的案例,類似這種多線程寫文件造成的問題, 就應(yīng)該使用MQ了。
MQ的使用場(chǎng)景大概包括解耦,提高峰值處理能力,送達(dá)和排序保證,緩沖等。
MQ概述
消息隊(duì)列技術(shù)是分布式應(yīng)用間交換信息的一種技術(shù)。
消息隊(duì)列可駐留在內(nèi)存或磁盤上,隊(duì)列存儲(chǔ)消息直到它們被應(yīng)用程序讀走。
通過消息隊(duì)列,應(yīng)用程序可獨(dú)立地執(zhí)行--它們不需要知道彼此的位置、或在繼續(xù)執(zhí)行前不需要等待接收程序接收此消息。
MQ主要作用是接受和轉(zhuǎn)發(fā)消息。你可以想想在生活中的一種場(chǎng)景:當(dāng)你把信件的投進(jìn)郵筒,郵遞員肯定最終會(huì)將信件送給收件人。我們可以把MQ比作 郵局和郵遞員。
MQ和郵局的主要區(qū)別是,它不處理消息,但是,它會(huì)接受數(shù)據(jù)、存儲(chǔ)消息數(shù)據(jù)、轉(zhuǎn)發(fā)消息。
RabbitMQ術(shù)語
生產(chǎn)者:
消息發(fā)送者,在MQ中被稱為生產(chǎn)者(producer),一個(gè)發(fā)送消息的應(yīng)用也被叫做生產(chǎn)者,用P表示
消費(fèi)者:
生產(chǎn)者“生產(chǎn)”出消息后,最終由誰消費(fèi)呢?等待接受消息的應(yīng)用程序,我們稱之為消費(fèi)者(Consuming ),用C表示
隊(duì)列:
消息只能存儲(chǔ)在隊(duì)列(queue )中。盡管消息在rabbitMQ和應(yīng)用程序間流通,但是隊(duì)列卻是存在于RabbitMQ內(nèi)部。
一個(gè)隊(duì)列不受任何限制,它可以存儲(chǔ)你想要存儲(chǔ)的消息量,它本質(zhì)上是一個(gè)無限的緩沖區(qū)。
多個(gè)生產(chǎn)者可以向同一個(gè)隊(duì)列發(fā)送消息,多個(gè)消費(fèi)者可以嘗試從同一個(gè)消息隊(duì)列中接收數(shù)據(jù)。
一個(gè)隊(duì)列像下面這樣(上面是它的隊(duì)列名稱)
注意:
生產(chǎn)者、消費(fèi)者、中間件不必在一臺(tái)機(jī)器上,實(shí)際應(yīng)用中也是絕大多數(shù)不在一起的。我們可以用一張圖表示RabbitMQ的構(gòu)造:
?
注:此圖片摘自于百度百科RabbitMQ。
使用RabbitMQ解決多線程寫入文件問題
分析
多線程寫入,產(chǎn)生消息的也就是一個(gè)程序(一個(gè)生產(chǎn)者P),消費(fèi)消息的也是一個(gè)消息,它的模型應(yīng)該是:
編寫代碼
引入RabbitMQ client DLL
程序包管理控制臺(tái)命令:
PM> Install-Package RabbitMQ.Client生產(chǎn)者
首先,創(chuàng)建一個(gè) connection 通過socket連接 去和服務(wù)器連接起來(需要傳目的服務(wù)器的IP、用戶名、密碼等)。
接著 創(chuàng)建一個(gè) channel ,這是大部分的要做的事情所在。
要發(fā)送消息,我們必須聲明一個(gè)隊(duì)列,,然后我們可以向隊(duì)列發(fā)布消息。
執(zhí)行一次BasicPublish方法,推送一個(gè)消息。
class Program{static void Main(string[] args){new Thread(Write).Start();new Thread(Write).Start();new Thread(Write).Start();}public static void Write(){var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);for (int i = 0; i < 8000; i++){string message = i.ToString();var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);Console.WriteLine("Program Sent {0}", message);}}}}View Code
聲明的隊(duì)列,在服務(wù)器中如果不存在了,會(huì)自動(dòng)創(chuàng)建。而消息的內(nèi)容是字節(jié)數(shù)組,在使用時(shí),注意編碼問題。
消費(fèi)者
當(dāng)隊(duì)列里有消息時(shí),消費(fèi)者要隨時(shí)能夠從隊(duì)列里獲取消息,所以我需要一直運(yùn)行它,讓它監(jiān)聽消息。
就像我們打籃球進(jìn)行傳球,需要事先確認(rèn)要傳給的那個(gè)隊(duì)友位置一樣,生產(chǎn)者要發(fā)送消息,一定要事先知道消費(fèi)消息的程序的對(duì)列是哪個(gè)。所以,在運(yùn)行生產(chǎn)者程序前,需要先啟動(dòng)消費(fèi)者程序。
由此,聲明對(duì)列,就應(yīng)該在消費(fèi)者程序中完成。
class Program{public static void Main(){var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"};using (var connection = factory.CreateConnection())using (var channel = connection.CreateModel()){channel.QueueDeclare(queue: "writeLog",durable: false,exclusive: false,autoDelete: false,arguments: null);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var message = Encoding.UTF8.GetString(body);ExcuateWriteFile(message);Console.WriteLine(" Receiver Received {0}", message);};channel.BasicConsume(queue: "writeLog",noAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();}}public static void ExcuateWriteFile(string i){using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append)){using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)){sw.Write(i);}}}}View Code
執(zhí)行程序
先執(zhí)行 消費(fèi)者程序,讓它一直保持監(jiān)聽。
錯(cuò)誤解決
執(zhí)行時(shí)VS報(bào)錯(cuò):
“RabbitMQ.Client.Exceptions.BrokerUnreachableException”類型的未經(jīng)處理的異常在 RabbitMQ.Client.dll 中發(fā)生 其他信息: None of the specified endpoints were reachable。
進(jìn)入查看詳細(xì)的內(nèi)部異常:
innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost '/' refused for user 'eric'\", classId=10, methodId=40, cause="}
此時(shí),我們打開在http://localhost:15672/#/users 可以看到eric 下 的Can access virtual hosts 為 NoAccess
解決辦法:
rabbitmqctl控制臺(tái)輸入
rabbitmqctl set_permissions -p / userName "." "." ".*"再次執(zhí)行時(shí),可以看到:
?
然后運(yùn)行 生產(chǎn)者程序。
我們先開著 Receive ,當(dāng)生產(chǎn)者運(yùn)行時(shí)
?
消費(fèi)者的自動(dòng)觸發(fā)執(zhí)行 :
直到所有的 指定的 queue 里面的消息完全消費(fèi)完為止。(此時(shí)消費(fèi)者程序仍然在監(jiān)聽中)
?
對(duì)于需要安裝和設(shè)置用戶的同學(xué),請(qǐng)參考 windows下 安裝 rabbitMQ 及操作常用命令
?
本文參考:
rabbitMq外文網(wǎng)站
百度百科
轉(zhuǎn)載于:https://my.oschina.net/morpheusWB/blog/1529641
總結(jié)
以上是生活随笔為你收集整理的[RabbitMQ]RabbitMQ原理与相关操作(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python [SSL: CERTIFI
- 下一篇: python pytest测试框架介绍四