生活随笔
收集整理的這篇文章主要介紹了
ActiveMq C#客户端 消息队列的使用(存和取)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1、準(zhǔn)備工具
VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2、開始項(xiàng)目
VS2013新建一個(gè)C#控制臺(tái)應(yīng)用程序,項(xiàng)目中添加兩個(gè)dll引用,一個(gè)是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目錄下的Apache.NMS.dll,另一個(gè)是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目錄下的Apache.NMS.ActiveMQ.dll。 新建一個(gè)類,MyActiveMq.cs,用于對(duì)activemq消息隊(duì)列接口的封裝,實(shí)現(xiàn)如下:
[csharp]?view plain
?copy using?System;??using?System.Collections.Generic;??using?System.Linq;??using?System.Text;??using?System.Threading.Tasks;????using?Apache.NMS;??using?Apache.NMS.ActiveMQ;????namespace?NmsProducerClasses??{??????public?class?MyActiveMq??????{??????????private?IConnectionFactory?factory;??????????private?IConnection?connection;??????????private?ISession?session;??????????private?IMessageProducer?prod;??????????private?IMessageConsumer?consumer;??????????private?ITextMessage?msg;????????????private?bool?isTopic?=?false;??????????private?bool?hasSelector?=?false;??????????private?const?string?ClientID?=?"clientid";??????????private?const?string?Selector?=?"filter='demo'";??????????private?bool?sendSuccess?=?true;??????????private?bool?receiveSuccess?=?true;????????????public?MyActiveMq(bool?isLocalMachine,?string?remoteAddress)??????????{??????????????try??????????????{??????????????????????????????????if?(isLocalMachine)??????????????????{??????????????????????factory?=?new?ConnectionFactory("tcp://localhost:61616/");??????????????????}??????????????????else??????????????????{??????????????????????factory?=?new?ConnectionFactory("tcp://"?+?remoteAddress?+?":61616/");?//寫tcp://192.168.1.111:61616的形式連接其他服務(wù)器上的ActiveMQ服務(wù)器?????????????????????????????}??????????????????????????????????connection?=?factory.CreateConnection();??????????????????connection.ClientId?=?ClientID;??????????????????connection.Start();??????????????????????????????????session?=?connection.CreateSession();??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}??????????????Console.WriteLine("Begin?connection...");??????????}??????????????~MyActiveMq()??????????{??????????????????????}????????????????????????????????????????????????????????????public?bool?InitQueueOrTopic(bool?topic,?string?name,?bool?selector?=?false)??????????{??????????????try??????????????{??????????????????????????????????if?(topic)??????????????????{??????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));??????????????????????if?(selector)??????????????????????{??????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?Selector,?false);??????????????????????????hasSelector?=?true;??????????????????????}??????????????????????else??????????????????????{??????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?null,?false);??????????????????????????hasSelector?=?false;??????????????????????}??????????????????????isTopic?=?true;??????????????????}??????????????????else??????????????????{??????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??????????????????????if?(selector)??????????????????????{??????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name),?Selector);??????????????????????????hasSelector?=?true;??????????????????????}??????????????????????else??????????????????????{??????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??????????????????????????hasSelector?=?false;??????????????????????}??????????????????????isTopic?=?false;??????????????????}??????????????????????????????????msg?=?prod.CreateTextMessage();??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????public?bool?SendMessage(string?message,?string?msgId?=?"defult",?MsgPriority?priority?=?MsgPriority.Normal)??????????{??????????????if?(prod?==?null)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??????????????????return?false;??????????????}????????????????Console.WriteLine("Begin?send?messages...");????????????????????????????msg.NMSCorrelationID?=?msgId;??????????????msg.Properties["MyID"]?=?msgId;??????????????msg.NMSMessageId?=?msgId;??????????????msg.Text?=?message;??????????????Console.WriteLine(message);????????????????if?(isTopic)??????????????{??????????????????sendSuccess?=?ProducerSubcriber(message,?priority);??????????????}??????????????else??????????????{??????????????????sendSuccess?=?P2P(message,?priority);??????????????}????????????????return?sendSuccess;??????????}??????????????public?string?GetMessage()??????????{??????????????if?(prod?==?null)??????????????{??????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??????????????????return?null;??????????????}????????????????Console.WriteLine("Begin?receive?messages...");??????????????ITextMessage?revMessage?=?null;??????????????try??????????????{??????????????????????????????????revMessage?=?consumer.Receive(new?TimeSpan(TimeSpan.TicksPerMillisecond?*10))?as?ITextMessage;???????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????receiveSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????if?(revMessage?==?null)??????????????{??????????????????Console.WriteLine("No?message?received!");??????????????????return?null;??????????????}??????????????else??????????????{??????????????????Console.WriteLine("Received?message?with?Correlation?ID:?"?+?revMessage.NMSCorrelationID);??????????????????????????????????Console.WriteLine("Received?message?with?text:?"?+?revMessage.Text);??????????????}????????????????return?revMessage.Text;??????????}????????????????????private?bool?P2P(string?message,?MsgPriority?priority)??????????{??????????????try??????????????{??????????????????if?(hasSelector)??????????????????{??????????????????????????????????????????msg.Properties.SetString("filter",?"demo");??????????????????}??????????????????prod.Priority?=?priority;??????????????????????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??????????????????????????????????prod.Send(msg,?MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????????????private?bool?ProducerSubcriber(string?message,?MsgPriority?priority)??????????{??????????????try??????????????{??????????????????prod.Priority?=?priority;??????????????????????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??????????????????prod.Send(msg,?Apache.NMS.MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??????????????????????????????}??????????????catch?(System.Exception?e)??????????????{??????????????????sendSuccess?=?false;??????????????????Console.WriteLine("Exception:{0}",?e.Message);??????????????????Console.ReadLine();??????????????????throw?e;??????????????}????????????????return?sendSuccess;??????????}??????????????public?void?ShutDown()??????????{??????????????Console.WriteLine("Close?connection?and?session...");??????????????session.Close();??????????????connection.Close();??????????}??????}??}?? Program.cs代碼如下:
[csharp]?view plain
?copy using?System;??using?System.Collections.Generic;??using?System.Linq;??using?System.Text;??using?System.Threading.Tasks;??using?System.IO;??using?System.Threading;????namespace?NmsProducerClasses??{??????class?Program??????{??????????static?void?Main(string[]?args)??????????{??????????????MyActiveMq?mymq?=?new?MyActiveMq(isLocalMachine:?true,?remoteAddress:?"");????????????????mymq.InitQueueOrTopic(topic:?false,?name:?"myqueue",?selector:?false);????????????????????????????????????????????????????????????????User?myuser0?=?new?User("0000",?"Lowest",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser0),?"newid",?priority:?Apache.NMS.MsgPriority.Lowest);??????????????User?myuser1?=?new?User("1111",?"AboveLow",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser1),?"newid",?priority:?Apache.NMS.MsgPriority.AboveLow);??????????????User?myuser2?=?new?User("2222",?"AboveNormal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser2),?"newid",?priority:?Apache.NMS.MsgPriority.AboveNormal);??????????????User?myuser3?=?new?User("0000",?"BelowNormal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser3),?"newid",?priority:?Apache.NMS.MsgPriority.BelowNormal);??????????????User?myuser4?=?new?User("1111",?"High",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser4),?"newid",?priority:?Apache.NMS.MsgPriority.High);??????????????User?myuser5?=?new?User("2222",?"Highest",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser5),?"newid",?priority:?Apache.NMS.MsgPriority.Highest);??????????????User?myuser6?=?new?User("0000",?"Low",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser6),?"newid",?priority:?Apache.NMS.MsgPriority.Low);??????????????User?myuser7?=?new?User("1111",?"Normal",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser7),?"newid",?priority:?Apache.NMS.MsgPriority.Normal);??????????????User?myuser8?=?new?User("2222",?"VeryHigh",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryHigh);??????????????User?myuser9?=?new?User("2222",?"VeryLow",?"img/p.jpg");??????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryLow);????????????????int?num?=?20;??????????????while?(num--?>?0)??????????????{??????????????????mymq.GetMessage();??????????????????????????????}??????????????mymq.ShutDown();??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????}????????}?? 3、測(cè)試
首先,需要啟動(dòng)消息隊(duì)列,具體啟動(dòng)及測(cè)試消息隊(duì)列步驟可見這邊:點(diǎn)擊打開鏈接 然后,運(yùn)行項(xiàng)目,運(yùn)行結(jié)果如下:
4、優(yōu)先級(jí)
priority并不能決定消息傳送的嚴(yán)格消息,具體原因可見 http://activemq.apache.org/how-can-i-support-priority-queues.html http://shift-alt-ctrl.iteye.com/blog/2034440?
優(yōu)先級(jí)設(shè)置:
在D:\apache-activemq-5.14.0\conf目錄的activemq.xml配置文件中,找到<destinationPolicy>標(biāo)簽,在其中的<policyEntries>標(biāo)簽下添加
[html]?view plain
?copy <policyEntry?queue=">"??producerFlowControl="false"?prioritizedMessages="true"?useCache="false"?expireMessagesPeriod="0"?queuePrefetch="1"?/>????<policyEntry?queue=">"?strictOrderDispatch="false"?/>????<policyEntry?queue=">"?>??????????????????<pendingMessageLimitStrategy>??????????????????????<constantPendingMessageLimitStrategy?limit="0"/>??????????????????</pendingMessageLimitStrategy>??????????????????<messageEvictionStrategy>??????????????????????<oldestMessageWithLowestPriorityEvictionStrategy/>??????????????????</messageEvictionStrategy>????</policyEntry>???? 配置完成后,需要重啟activemq
5、遠(yuǎn)程登錄監(jiān)控
要實(shí)現(xiàn)遠(yuǎn)程監(jiān)控服務(wù)器消息隊(duì)列,需要先進(jìn)行配置。 配置方法:在D:\apache-activemq-5.14.0\conf目錄的jetty.xml配置文件中,把133開始的那段注釋去掉即可。
總結(jié)
以上是生活随笔為你收集整理的ActiveMq C#客户端 消息队列的使用(存和取)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。