ASP.NET中进行消息处理(MSMQ) 三
在本文的前兩篇文章里對MSMQ的相關知識點進行了介紹,很多閱讀過這前兩篇文章的朋友都曾問到過這樣一些問題:
? 1、如何把MSMQ應用到實際的項目中去呢?
? 2、可不可以介紹一個實際的應用實例?
??3、......
????? 在前兩篇文章里,關于MSMQ常用的技術點基本介紹完畢了,本文主要以MS開源項目PetShop中的MSMQ應用作為案例來介紹MSMQ在實際項目中的應用。在PetShop里,由于系統使用了多線程的專用應用程序來監控消息隊列,在進入PetShop應用分析前,我們先來了解下關于多線程和MSMQ的相關知識點。一、多線程和MSMQ
????? 現在有這樣一個需求,指定的消息隊列里不管有無消息數據,我們通過一個多線程來監控這個隊列,一但隊列里的數據發生變化就做出相應的處理,比如把消息讀取出來。根據這個需求,我們來做個示例,用一多線程把隊列監控起來,如果隊列里有消息數據,就把消息讀取出來,如果沒有則一直監視隊列,當隊列數據發生改變(有新的消息加入)的時候就作出處理(讀取消息)。
??????首先定義一個線程數組用于存儲線程數;
2static?private?Thread[]?ThreadArray?=?new?Thread[ThreadNumber];
?????我們把需要啟動的線程裝載入ThreadArray數組,通過一個遍歷數組把所以的線程啟動,實際這里只有5個線程。
?1private?void?button1_Click(object?sender,?EventArgs?e)?2{
?3????StartThreads();
?4}
?5
?6private?void?StartThreads()
?7{
?8????int?counter;?//線程計數
?9????for?(counter?=?0;?counter?<?ThreadNumber;?counter++)
10????{
11????????ThreadArray[counter]?=?new?Thread(new?ThreadStart(MSMQListen));
12????????ThreadArray[counter].Start();
13????????this.richTextBox2.Text?+=?(counter?+?1).ToString()?+?"號線程開始!";
14????}
15}
16
17private?void?MSMQListen()
18{
19????while?(true)
20????{
21????????//取出隊列里的消息
22????????MessageBox.Show(MsgQueue.ReceiveMessage());
23????}
24}
????? 如果上面這段代碼閱讀起存在問題,建議先去了解下多線程的相關知識點。在StartThreads方法里啟動數組里存儲的所以線程,并委托給MSMQListen方法進行處理,MSMQListen方法完成的就是讀取隊列里的消息,這里我使用了在第二篇文章里所使用的MsgQueue類和Book類,詳細請閱讀第二篇文章ASP.NET中進行消息處理(MSMQ) 二?。
??????啟動了5個線程,用來監視指定的消息隊列,如上圖。那好,我們現在就來測試一下,通過給隊列里發送消息,看線程是否會有響應。從上面啟動線程的代碼上可以很清晰的看出,只要隊列里有消息在多線程的監視下,線程就會把隊列里的消息讀取出來。 ?1private?void?button3_Click(object?sender,?EventArgs?e)
?2{
?3????Book?book?=?new?Book();
?4????book.BookId?=?1;
?5????book.BookName?=?"asp.net";
?6????book.BookAuthor?=?"abcd";
?7????book.BookPrice?=?50.80;
?8
?9????MsgQueue.SendMessage(book);
10}
????? 那么這里的測試,向隊列里發送了一Book對象消息,根據上面分析,則多線程便會同時把此條消息讀取出來:?
???? 呵呵,測試結果出來,看來此測試達到了我們之前所提出的需求。!OK,關于MSMQ和多線程就簡單介紹于此。二、MSMQ在開塬項目PetShop中的應用分析。
????? 在PetShop 4.0中,利用消息隊列臨時存放要插入的數據,來避免因為頻繁訪問數據庫的操作。而隊列中的消息,則等待系統的專用的應用程序來處理,最后將數據插入到數據庫中。
????? PetShop 4.0中的消息處理,主要分為下面幾大部分:訂單策略接口IOrderStategy、消息接口IMessageing、消息工廠MessageFactory、MSMQ實現MSMQMessaging、后臺處理應用程序OrderProessor。如下圖:
1、訂單策略接口IOrderStategy
?????PetShop 4.0的體系結構是非常龐大,在訂單處理上有兩種處理策略,這里也是策略模式的一個應用,IOrderStrategy接口作為訂單策略的高層抽象,實現不同訂單處理的具體策略去實現它,UML如下:?
示意性代碼:
?1namespace?PetShop.IBLLStrategy?2{
?3????public?interface?IOrderStrategy
?4????{
?5????????void?Insert(OrderInfo?order);
?6????}
?7}
?8
?9namespace?PetShop.BLL
10{
11????public?class?OrderSynchronous:IOrderStrategy
12????{
13????????private?static?readonly?IOrder?asynchOrder?=?QueueAccess.CreateOrder();
14
15????????public?void?Insert(OrderInfo?order)
16????????{
17????????????asynchOrder.Send(order);
18????????}
19????}
20}
21
22//
????? 從上面UML和代碼就可以看出,訂單策略接口下有兩種實現,使用了抽象工廠模式來完成相應的訂單策略對象的創建?。關于這點在后面消息工廠部分去介紹,這里不作講解。
2、消息接口IMessageing
???? 在PetShop 4.0中,由于對訂單處理使用了異步處理方式,在消息接口中僅定義了一個IOrder接口。IOrder接口的定義與MSMQ的實現是一致的,需要提供發送和接收操作。在Send方法中,參數為數據訪問層的數據實體對象(OrderInfo),具體的實現則是用MSMQ的實現類(PetShop.MSMQMessaging.Order)去完成的。
????? MS的開發人員真的是什么都能想到,在消息接口的實現上考慮得很全面,為了避免將來的擴展會有其他的數據對象也使用到MSMQ;因此,在PetShop 4.0中的消息接口實現中,定義了一個隊列的基類(PetShopQueue),實現了消息的發送(Send)和接收(Receive)方法的基本操作。代碼如下: ?1namespace?PetShop.MSMQMessaging
?2{
?3????public?class?PetShopQueue:IDisposable
?4????{
?5????????//指定消息隊列事務的類型
?6????????protected?MessageQueueTransactionType?transactionType?=?MessageQueueTransactionType.Automatic;
?7????????protected?MessageQueue?queue;??//消息隊列
?8????????protected?TimeSpan?timeout;????//時間間隔
?9
10????????public?PetShopQueue(string?queuePath,?int?timeoutSeconds)
11????????{
12????????????queue?=?new?MessageQueue(queuePath);??//根據傳入quueuPath創建隊列
13????????????timeout?=?TimeSpan.FromSeconds(Convert.ToDouble(timeoutSeconds));
14
15????????????queue.DefaultPropertiesToSend.AttachSenderId?=?false;
16????????????queue.DefaultPropertiesToSend.UseAuthentication?=?false;
17????????????queue.DefaultPropertiesToSend.UseEncryption?=?false;
18????????????queue.DefaultPropertiesToSend.AcknowledgeType?=?AcknowledgeTypes.None;
19????????????queue.DefaultPropertiesToSend.UseJournalQueue?=?false;
20????????}
21
22????????///?<summary>
23????????///?接收消息方法
24????????///?</summary>
25????????public?virtual?object?Receive()
26????????{
27????????????try
28????????????{
29????????????????using?(Message?message?=?queue.Receive(timeout,?transactionType))
30????????????????????return?message;
31????????????}
32????????????catch?(MessageQueueException?mqex)
33????????????{
34????????????????if?(mqex.MessageQueueErrorCode?==?MessageQueueErrorCode.IOTimeout)
35????????????????????throw?new?TimeoutException();
36????????????????throw;
37????????????}
38????????}
39
40????????///?<summary>
41????????///?發送消息
42????????///?</summary>
43????????public?virtual?void?Send(object?msg)
44????????{
45????????????queue.Send(msg,?transactionType);
46????????}
47
48????????#region?IDisposable?成員
49????????public?void?Dispose()
50????????{
51????????????queue.Dispose();??//解放資源
52????????}
53????????#endregion
54????}
55}
???? MSMQ隊列是一個可持久的隊列,不會因用戶不間斷的下訂單導致數據丟失。queue作為存放數據的隊列,為消息隊列(MessageQueue)類型,同時還為PetShopQueue設置了timeout值,后臺處理應用程序(OrderProessor)會根據timeout的值定期掃描隊列中的訂單數量。
3、消息工廠MessageFactory
???? 可能是考慮到IOrder的實現會改變不同的策略吧,在PetShop里利用了抽象工廠模式,將IOrder對象的創建用了專門的工廠模塊(MessageFactory)進行封裝,定義如下:
?2{
?3????///?<summary>
?4????///?This?class?is?implemented?following?the?Abstract?Factory?pattern?to?create?the?Order
?5????///?Messaging?implementation?specified?from?the?configuration?file
?6????///?</summary>
?7????public?sealed?class?QueueAccess
?8????{
?9????????//<add?key="OrderMessaging"?value="PetShop.MSMQMessaging"/>
10????????private?static?readonly?string?path?=?"PetShop.MSMQMessaging";
11
12????????///?<summary>
13????????///?私有構造器,防止使用new創建對象實例
14????????///?</summary>
15????????private?QueueAccess()
16????????{?}
17
18????????public?static?IOrder?CreateOrder()
19????????{
20????????????string?className?=?path?+?".Order";
21????????????return?(IOrder)Assembly.Load(path).CreateInstance(className);
22????????}
23????}
24}
???? 在QueueAccess類中,通過CreateOrder方法利用反射技術創建正確IOrder類型對象(實際也就是創建了一個接口的具體實現類的對象,應用了多態的原理和反射技術)。UML圖下:?
???? 在PetShop4.0中,消息接口的具體實現是通過配置文件定義在web.config里: <add?key="OrderMessaging"?value="PetShop.MSMQMessaging"/>
???? 這里我為了能夠更直觀的演示和介紹就把path固化定義了,如下:
private?static?readonly?string?path?=?"PetShop.MSMQMessaging";???? 這里利用工廠模式來負責對象的創建,主要是方便業務邏輯層對定單處理策略的調用,如在PetShop.BLL模塊中的OrderSynchronous類:
?1namespace?PetShop.BLL?2{
?3????public?class?OrderSynchronous:IOrderStrategy
?4????{
?5????????private?static?readonly?IOrder?asynchOrder?=?QueueAccess.CreateOrder();
?6
?7????????public?void?Insert(OrderInfo?order)
?8????????{
?9????????????asynchOrder.Send(order);
10????????}
11????}
12}
這樣一但IOrder接口的實現發生了變化,此時就只需要修改配置文件就OK,整個系統就顯得很靈活,穩定。
4、MSMQ實現MSMQMessaging
??? 在PetShop.MSMQMessaging模塊中,訂單對象實現了消息接口(IMessaging)模塊中的IOrder,同時還繼承了基類PetShopQueue。定義如下:
?2{
?3????public?class?Order:PetShopQueue,IOrder
?4????{
?5????????private?static?readonly?string?queuePath?=?ConfigurationManager.AppSettings["OrderQueuePath"];
?6????????private?static?int?queueTimeout?=?20;??//20秒為超時
?7
?8????????public?Order()
?9????????????:?base(queuePath,?queueTimeout)
10????????{
11????????????queue.Formatter?=?new?BinaryMessageFormatter();
12????????????Console.WriteLine(queuePath);
13????????}
14
15????????public?new?OrderInfo?Receive()
16????????{
17????????????//該方法會應用在分布式事務中,故而設置為Automatic的事務類型。
18????????????base.transactionType?=?MessageQueueTransactionType.Automatic;
19????????????return?(OrderInfo)((Message)base.Receive()).Body;
20????????}
21
22????????public?OrderInfo?Receive(int?timeout)
23????????{
24????????????base.timeout?=?TimeSpan.FromSeconds(Convert.ToDouble(timeout));
25????????????return?Receive();
26????????}
27
28????????///?<summary>
29????????///?異步發送定單到消息隊列
30????????///?</summary>
31????????///?<param?name="orderMessage"></param>
32????????public?void?Send(OrderInfo?orderMessage)
33????????{
34????????????//該方法不會用于分布式事務中,故而設置為Single的事務類型。
35????????????base.transactionType?=?MessageQueueTransactionType.Single;
36????????????base.Send(orderMessage);
37????????}
38????}
39}
UML草圖:
???? 這里需要注意的是,Order類既繼承了基類PetShopQueue,同時還實現了接口IOrder,而在消息接口和基類中所定義的接收消息(Receive)方法在方法的簽名上是相同的。所以在Order的Receive方法實現中,必須使用new而非override關鍵字來重寫父類PetShopQueue的Receive虛方法。此時Order類的Receive方法代表兩個含義,一是實現了消息接口IOrder中的Receive方法;二則是利用了new關鍵字重寫了父類PetShopQueue的Receive虛方法。
???? 在PetShop4.0中,將面向對象的知識點應用得非常精妙,如上分析,此時我門可以怎么來調用Order呢?是這樣嗎? 1//1、使用基類PetShopQueue
2PetShopQueue?order?=?new?Order();
3order.Receive();
4
5//2、使用消息接口IOrder
6IOrder??order?=?new?Order();
7order.Receive();
???? 根據多態原理,上面這兩種實現都是正確的,那我們那取誰舍誰呢?在PetShop4.0中正確的調用是第2種方法,這種調用方法也更符合“面向接口設計”的原則。----詳細請查看消息工廠MessageFactory部分的介紹。
5、后臺處理應用程序OrderProessor
????? 前面一系列的操作,最終都會走向到這里,這里實現了最終的插入數據庫的操作。在PetShop 4.0中OrderProessor是一個控制臺應用程序,根據需求也可以將其設計為Windows Service。他完成的操作就是接收消息隊列里的訂單數據,將其插入到數據庫。在OrderProessor里使用了多線程技術,監視隊列里的訂單信息,定期的將其處理。在主方法Main方法中用于控制線程序,核心的執行任何則委托給ProcessOrders方法去實現。
?2{
?3????TimeSpan?tsTimeout?=?TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout?*?batchSize));
?4????Order?order?=?new?Order();??//邏輯層的PetShop.BLL.Order
?5????while?(true)
?6????{
?7????????TimeSpan?datatimeStarting?=?new?TimeSpan(DateTime.Now.Ticks);
?8????????double?elapsedTime?=?0;
?9
10????????int?processedItems?=?0;
11????????ArrayList?queueOrders?=?new?ArrayList();
12
13????????//首先驗證事務
14????????using?(TransactionScope?ts?=?new?TransactionScope(TransactionScopeOption.Required,?tsTimeout))
15????????{
16????????????//從隊列中檢索訂單
17????????????for?(int?i?=?0;?i?<?batchSize;?i++)
18????????????{
19????????????????try
20????????????????{
21????????????????????//在一定時間?類接收隊列的訂單
22????????????????????if?((elapsedTime?+?queueTimeout?+?transactionTimeout)?<?tsTimeout.TotalSeconds)
23????????????????????{
24????????????????????????queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
25????????????????????}
26????????????????????else
27????????????????????{
28????????????????????????i?=?batchSize;??//?結束循環
29????????????????????}
30????????????????????elapsedTime?=?new?TimeSpan(DateTime.Now.Ticks).TotalSeconds?-?datatimeStarting.TotalSeconds;
31????????????????}
32????????????????catch?(TimeoutException)
33????????????????{
34????????????????????//沒有可以等待的消息也結束循環
35????????????????????i?=?batchSize;
36????????????????}
37????????????}
38
39????????????//處理隊列的訂單
40????????????for?(int?k?=?0;?k?<?queueOrders.Count;?k++)
41????????????{
42????????????????order.Insert((OrderInfo)queueOrders[k]);
43????????????????processedItems++;
44????????????????totalOrdersProcessed++;
45????????????}
46????????????//處理完畢或者是超時
47????????????ts.Complete();
48????????}
49????????Console.WriteLine("(Thread?Id?"?+?Thread.CurrentThread.ManagedThreadId?+?")?batch?finished,?"?+?processedItems?+?"?items,?in?"?+?elapsedTime.ToString()?+?"?seconds.");
50????}
51}
?????ProcessOrders方法首先通過業務邏輯層PetShop.BLL.Order類的方法ReceiveFromQueue去獲取消息隊列中的訂單數據,并將其放入一個ArrayList對象,然后將其插入到數據庫。 OrderProcessor的完整代碼定義如下:
??1using?System;
??2using?System.Collections.Generic;
??3using?System.Text;
??4using?System.Configuration;
??5using?System.Threading;
??6using?PetShop.BLL;
??7using?System.Collections;
??8using?System.Transactions;
??9using?PetShop.Model;
?10
?11namespace?PetShop.OrderProcessor
?12{
?13????class?Program
?14????{
?15????????private?static?int?transactionTimeout?=?int.Parse(ConfigurationManager.AppSettings["TransactionTimeout"]);
?16????????private?static?int?queueTimeout?=?int.Parse(ConfigurationManager.AppSettings["QueueTimeout"]);
?17????????private?static?int?batchSize?=?int.Parse(ConfigurationManager.AppSettings["BatchSize"]);
?18????????private?static?int?threadCount?=?int.Parse(ConfigurationManager.AppSettings["ThreadCount"]);
?19????????private?static?int?totalOrdersProcessed?=?0;
?20
?21????????static?void?Main(string[]?args)
?22????????{
?23????????????Thread?workTicketThread;
?24????????????Thread[]?workerThreads?=?new?Thread[threadCount];
?25
?26????????????for?(int?i?=?0;?i?<?threadCount;?i++)
?27????????????{
?28????????????????workTicketThread?=?new?Thread(new?ThreadStart(ProcessOrders));
?29????????????????//指示線呈是否為一后臺線程
?30????????????????workTicketThread.IsBackground?=?true;
?31????????????????workTicketThread.SetApartmentState(ApartmentState.STA);
?32
?33????????????????workTicketThread.Start();
?34????????????????workerThreads[i]?=?workTicketThread;
?35????????????}
?36
?37????????????Console.WriteLine("開始處理,按任意鍵停止.");
?38????????????Console.ReadLine();
?39????????????Console.WriteLine("正在終止線程,請等待");
?40
?41????????????//終止所以線程
?42????????????for?(int?i?=?0;?i?<?workerThreads.Length;?i++)
?43????????????{
?44????????????????workerThreads[i].Abort();
?45????????????}
?46
?47????????????Console.WriteLine();
?48????????????Console.WriteLine(totalOrdersProcessed?+?"?張訂單已經處理.");
?49????????????Console.WriteLine("已終止處理.按任意鍵退出");
?50????????????Console.ReadLine();
?51????????}
?52
?53????????private?static?void?ProcessOrders()
?54????????{
?55????????????TimeSpan?tsTimeout?=?TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout?*?batchSize));
?56????????????Order?order?=?new?Order();??//邏輯層的PetShop.BLL.Order
?57????????????while?(true)
?58????????????{
?59????????????????TimeSpan?datatimeStarting?=?new?TimeSpan(DateTime.Now.Ticks);
?60????????????????double?elapsedTime?=?0;
?61
?62????????????????int?processedItems?=?0;
?63????????????????ArrayList?queueOrders?=?new?ArrayList();
?64
?65????????????????//首先驗證事務
?66????????????????using?(TransactionScope?ts?=?new?TransactionScope(TransactionScopeOption.Required,?tsTimeout))
?67????????????????{
?68????????????????????//從隊列中檢索訂單
?69????????????????????for?(int?i?=?0;?i?<?batchSize;?i++)
?70????????????????????{
?71????????????????????????try
?72????????????????????????{
?73????????????????????????????//在一定時間?類接收隊列的訂單
?74????????????????????????????if?((elapsedTime?+?queueTimeout?+?transactionTimeout)?<?tsTimeout.TotalSeconds)
?75????????????????????????????{
?76????????????????????????????????queueOrders.Add(order.ReceiveFromQueue(queueTimeout));
?77????????????????????????????}
?78????????????????????????????else
?79????????????????????????????{
?80????????????????????????????????i?=?batchSize;??//?結束循環
?81????????????????????????????}
?82????????????????????????????elapsedTime?=?new?TimeSpan(DateTime.Now.Ticks).TotalSeconds?-?datatimeStarting.TotalSeconds;
?83????????????????????????}
?84????????????????????????catch?(TimeoutException)
?85????????????????????????{
?86????????????????????????????//沒有可以等待的消息也結束循環
?87????????????????????????????i?=?batchSize;
?88????????????????????????}
?89????????????????????}
?90
?91????????????????????//處理隊列的訂單
?92????????????????????for?(int?k?=?0;?k?<?queueOrders.Count;?k++)
?93????????????????????{
?94????????????????????????order.Insert((OrderInfo)queueOrders[k]);
?95????????????????????????processedItems++;
?96????????????????????????totalOrdersProcessed++;
?97????????????????????}
?98????????????????????//處理完畢或者是超時
?99????????????????????ts.Complete();
100????????????????}
101????????????????Console.WriteLine("(Thread?Id?"?+?Thread.CurrentThread.ManagedThreadId?+?")?batch?finished,?"?+?processedItems?+?"?items,?in?"?+?elapsedTime.ToString()?+?"?seconds.");
102????????????}
103????????}
104????}
105}
106
????? MSMQ技術除了用于異步處理之外,還可作為一種分布式處理技術應用。關于使用MSMQ進行分布式處理相關的內容,本人能力有限,還請大家查看相關的資料進行了解。
????? 本文介紹了MSMQ和多線程以及對PetShop 4.0中對MSMQ的應用進行了分析,結合之前我寫的兩篇關于MSMQ的相關知識點的介紹文章,對MSMQ算是建立了一個全面的認識,希望這三篇文章對大家學習MSMQ有所幫助。
文中示例代碼下載
總結
以上是生活随笔為你收集整理的ASP.NET中进行消息处理(MSMQ) 三的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ASP.NET中进行消息处理(MSMQ)
- 下一篇: 经营总收入是10752.9万元,产值等于