生活随笔
收集整理的這篇文章主要介紹了
ActiveMq C#客户端 消息队列的使用(存和取)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、準備工具
VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2、開始項目
VS2013新建一個C#控制臺應用程序,項目中添加兩個dll引用,一個是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目錄下的Apache.NMS.dll,另一個是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目錄下的Apache.NMS.ActiveMQ.dll。 新建一個類,MyActiveMq.cs,用于對activemq消息隊列接口的封裝,實現如下:
[csharp]?view plain
?copy using?System;??using?System.Collections.Generic;??using?System.Linq;??using?System.Text;??using?System.Threading.Tasks;????using?Apache.NMS;??using?Apache.NMS.ActiveMQ;????namespace?NmsProducerClasses??{??????public?class?MyActiveMq??????{??????????private?IConnectionFactory?factory;??????????private?IConnection?connection;??????????private?ISession?session;??????????private?IMessageProducer?prod;??????????private?IMessageConsumer?consumer;??????????private?ITextMessage?msg;????????????private?bool?isTopic?=?false;??????????private?bool?hasSelector?=?false;??????????private?const?string?ClientID?=?"clientid";??????????private?const?string?Selector?=?"filter='demo'";??????????private?bool?sendSuccess?=?true;??????????private?bool?receiveSuccess?=?true;????????????public?MyActiveMq(bool?isLocalMachine,?string?remoteAddress)??????????{??????????????try??????????????{??????????????????????????????????if?(isLocalMachine)??????????????????{??????????????????????factory?=?new?ConnectionFactory("tcp://localhost:61616/");??????????????????}??????????????????else??????????????????{??????????????????????factory?=?new?ConnectionFactory("tcp://"?+?remoteAddress?+?":61616/");?//寫tcp://192.168.1.111:61616的形式連接其他服務器上的ActiveMQ服務器?????????????????????????????}??????????????????????????????????connection?=?factory.CreateConnection();??????????????????connection.ClientId?=?ClientID;??????????????????connection.Start();??????????????????????????????????session?=?connection.CreateSession();??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}??????????????Console.WriteLine("Begin?connection...");??????????}??????????????~MyActiveMq()??????????{??????????????????????}????????????????????????????????????????????????????????????public?bool?InitQueueOrTopic(bool?topic,?string?name,?bool?selector?=?false)??????????{??????????????try??????????????{??????????????????????????????????if?(topic)??????????????????{??????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));??????????????????????if?(selector)??????????????????????{??????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?Selector,?false);??????????????????????????hasSelector?=?true;??????????????????????}??????????????????????else??????????????????????{??????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?null,?false);??????????????????????????hasSelector?=?false;??????????????????????}??????????????????????isTopic?=?true;??????????????????}??????????????????else??????????????????{??????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??????????????????????if?(selector)??????????????????????{??????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name),?Selector);??????????????????????????hasSelector?=?true;??????????????????????}??????????????????????else??????????????????????{??????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??????????????????????????hasSelector?=?false;??????????????????????}??????????????????????isTopic?=?false;??????????????????}??????????????????????????????????msg?=?prod.CreateTextMessage();??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????public?bool?SendMessage(string?message,?string?msgId?=?"defult",?MsgPriority?priority?=?MsgPriority.Normal)??????????{??????????????if?(prod?==?null)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??????????????????return?false;??????????????}????????????????Console.WriteLine("Begin?send?messages...");????????????????????????????msg.NMSCorrelationID?=?msgId;??????????????msg.Properties["MyID"]?=?msgId;??????????????msg.NMSMessageId?=?msgId;??????????????msg.Text?=?message;??????????????Console.WriteLine(message);????????????????if?(isTopic)??????????????{??????????????????sendSuccess?=?ProducerSubcriber(message,?priority);??????????????}??????????????else??????????????{??????????????????sendSuccess?=?P2P(message,?priority);??????????????}????????????????return?sendSuccess;??????????}??????????????public?string?GetMessage()??????????{??????????????if?(prod?==?null)??????????????{??????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??????????????????return?null;??????????????}????????????????Console.WriteLine("Begin?receive?messages...");??????????????ITextMessage?revMessage?=?null;??????????????try??????????????{??????????????????????????????????revMessage?=?consumer.Receive(new?TimeSpan(TimeSpan.TicksPerMillisecond?*10))?as?ITextMessage;???????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????if?(revMessage?==?null)??????????????{??????????????????Console.WriteLine("No?message?received!");??????????????????return?null;??????????????}??????????????else??????????????{??????????????????Console.WriteLine("Received?message?with?Correlation?ID:?"?+?revMessage.NMSCorrelationID);??????????????????????????????????Console.WriteLine("Received?message?with?text:?"?+?revMessage.Text);??????????????}????????????????return?revMessage.Text;??????????}????????????????????private?bool?P2P(string?message,?MsgPriority?priority)??????????{??????????????try??????????????{??????????????????if?(hasSelector)??????????????????{??????????????????????????????????????????msg.Properties.SetString("filter",?"demo");??????????????????}??????????????????prod.Priority?=?priority;??????????????????????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??????????????????????????????????prod.Send(msg,?MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????????????private?bool?ProducerSubcriber(string?message,?MsgPriority?priority)??????????{??????????????try??????????????{??????????????????prod.Priority?=?priority;??????????????????????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??????????????????prod.Send(msg,?Apache.NMS.MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??????????????????????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????public?void?ShutDown()??????????{??????????????Console.WriteLine("Close?connection?and?session...");??????????????session.Close();??????????????connection.Close();??????????}??????}??}?? Program.cs代碼如下:
[csharp]?view plain
?copy using?System;??using?System.Collections.Generic;??using?System.Linq;??using?System.Text;??using?System.Threading.Tasks;??using?System.IO;??using?System.Threading;????namespace?NmsProducerClasses??{??????class?Program??????{??????????static?void?Main(string[]?args)??????????{??????????????MyActiveMq?mymq?=?new?MyActiveMq(isLocalMachine:?true,?remoteAddress:?"");????????????????mymq.InitQueueOrTopic(topic:?false,?name:?"myqueue",?selector:?false);????????????????????????????????????????????????????????????????User?myuser0?=?new?User("0000",?"Lowest",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser0),?"newid",?priority:?Apache.NMS.MsgPriority.Lowest);??????????????User?myuser1?=?new?User("1111",?"AboveLow",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser1),?"newid",?priority:?Apache.NMS.MsgPriority.AboveLow);??????????????User?myuser2?=?new?User("2222",?"AboveNormal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser2),?"newid",?priority:?Apache.NMS.MsgPriority.AboveNormal);??????????????User?myuser3?=?new?User("0000",?"BelowNormal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser3),?"newid",?priority:?Apache.NMS.MsgPriority.BelowNormal);??????????????User?myuser4?=?new?User("1111",?"High",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser4),?"newid",?priority:?Apache.NMS.MsgPriority.High);??????????????User?myuser5?=?new?User("2222",?"Highest",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser5),?"newid",?priority:?Apache.NMS.MsgPriority.Highest);??????????????User?myuser6?=?new?User("0000",?"Low",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser6),?"newid",?priority:?Apache.NMS.MsgPriority.Low);??????????????User?myuser7?=?new?User("1111",?"Normal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser7),?"newid",?priority:?Apache.NMS.MsgPriority.Normal);??????????????User?myuser8?=?new?User("2222",?"VeryHigh",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryHigh);??????????????User?myuser9?=?new?User("2222",?"VeryLow",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryLow);????????????????int?num?=?20;??????????????while?(num--?>?0)??????????????{??????????????????mymq.GetMessage();??????????????????????????????}??????????????mymq.ShutDown();??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????}????????}?? 3、測試
首先,需要啟動消息隊列,具體啟動及測試消息隊列步驟可見這邊:點擊打開鏈接 然后,運行項目,運行結果如下:
4、優先級
priority并不能決定消息傳送的嚴格消息,具體原因可見 http://activemq.apache.org/how-can-i-support-priority-queues.html http://shift-alt-ctrl.iteye.com/blog/2034440?
優先級設置:
在D:\apache-activemq-5.14.0\conf目錄的activemq.xml配置文件中,找到<destinationPolicy>標簽,在其中的<policyEntries>標簽下添加
[html]?view plain
?copy <policyEntry?queue=">"??producerFlowControl="false"?prioritizedMessages="true"?useCache="false"?expireMessagesPeriod="0"?queuePrefetch="1"?/>????<policyEntry?queue=">"?strictOrderDispatch="false"?/>????<policyEntry?queue=">"?>??????????????????<pendingMessageLimitStrategy>??????????????????????<constantPendingMessageLimitStrategy?limit="0"/>??????????????????</pendingMessageLimitStrategy>??????????????????<messageEvictionStrategy>??????????????????????<oldestMessageWithLowestPriorityEvictionStrategy/>??????????????????</messageEvictionStrategy>????</policyEntry>???? 配置完成后,需要重啟activemq
5、遠程登錄監控
要實現遠程監控服務器消息隊列,需要先進行配置。 配置方法:在D:\apache-activemq-5.14.0\conf目錄的jetty.xml配置文件中,把133開始的那段注釋去掉即可。
總結
以上是生活随笔為你收集整理的ActiveMq C#客户端 消息队列的使用(存和取)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。