日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > C# >内容正文

C#

ActiveMq C#客户端 消息队列的使用(存和取)

發(fā)布時(shí)間:2024/4/17 C# 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMq C#客户端 消息队列的使用(存和取) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1、準(zhǔn)備工具

  • VS2013
  • Apache.NMS.ActiveMQ-1.7.2-bin.zip
  • apache-activemq-5.14.0-bin.zip
  • 2、開始項(xiàng)目

    VS2013新建一個(gè)C#控制臺(tái)應(yīng)用程序,項(xiàng)目中添加兩個(gè)dll引用,一個(gè)是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目錄下的Apache.NMS.dll,另一個(gè)是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目錄下的Apache.NMS.ActiveMQ.dll。 新建一個(gè)類,MyActiveMq.cs,用于對(duì)activemq消息隊(duì)列接口的封裝,實(shí)現(xiàn)如下: [csharp]?view plain?copy
  • using?System;??
  • using?System.Collections.Generic;??
  • using?System.Linq;??
  • using?System.Text;??
  • using?System.Threading.Tasks;??
  • ??
  • using?Apache.NMS;??
  • using?Apache.NMS.ActiveMQ;??
  • ??
  • namespace?NmsProducerClasses??
  • {??
  • ????public?class?MyActiveMq??
  • ????{??
  • ????????private?IConnectionFactory?factory;??
  • ????????private?IConnection?connection;??
  • ????????private?ISession?session;??
  • ????????private?IMessageProducer?prod;??
  • ????????private?IMessageConsumer?consumer;??
  • ????????private?ITextMessage?msg;??
  • ??
  • ????????private?bool?isTopic?=?false;??
  • ????????private?bool?hasSelector?=?false;??
  • ????????private?const?string?ClientID?=?"clientid";??
  • ????????private?const?string?Selector?=?"filter='demo'";??
  • ????????private?bool?sendSuccess?=?true;??
  • ????????private?bool?receiveSuccess?=?true;??
  • ??
  • ????????public?MyActiveMq(bool?isLocalMachine,?string?remoteAddress)??
  • ????????{??
  • ????????????try??
  • ????????????{??
  • ????????????????//初始化工廠?????
  • ????????????????if?(isLocalMachine)??
  • ????????????????{??
  • ????????????????????factory?=?new?ConnectionFactory("tcp://localhost:61616/");??
  • ????????????????}??
  • ????????????????else??
  • ????????????????{??
  • ????????????????????factory?=?new?ConnectionFactory("tcp://"?+?remoteAddress?+?":61616/");?//寫tcp://192.168.1.111:61616的形式連接其他服務(wù)器上的ActiveMQ服務(wù)器?????????????
  • ????????????????}??
  • ????????????????//通過工廠建立連接??
  • ????????????????connection?=?factory.CreateConnection();??
  • ????????????????connection.ClientId?=?ClientID;??
  • ????????????????connection.Start();??
  • ????????????????//通過連接創(chuàng)建Session會(huì)話??
  • ????????????????session?=?connection.CreateSession();??
  • ????????????}??
  • ????????????catch?(System.Exception?e)??
  • ????????????{??
  • ????????????????sendSuccess?=?false;??
  • ????????????????receiveSuccess?=?false;??
  • ????????????????Console.WriteLine("Exception:{0}",?e.Message);??
  • ????????????????Console.ReadLine();??
  • ????????????????throw?e;??
  • ????????????}??
  • ????????????Console.WriteLine("Begin?connection...");??
  • ????????}??
  • ??
  • ??
  • ????????~MyActiveMq()??
  • ????????{??
  • ????????????//this.ShutDown();??
  • ????????}??
  • ??
  • ????????///?<summary>??
  • ????????///?初始化??
  • ????????///?</summary>??
  • ????????///?<param?name="topic">選擇是否是Topic</param>??
  • ????????///?<param?name="name">隊(duì)列名</param>??
  • ????????///?<param?name="selector">是否設(shè)置過濾</param>??
  • ????????public?bool?InitQueueOrTopic(bool?topic,?string?name,?bool?selector?=?false)??
  • ????????{??
  • ????????????try??
  • ????????????{??
  • ????????????????//通過會(huì)話創(chuàng)建生產(chǎn)者、消費(fèi)者??
  • ????????????????if?(topic)??
  • ????????????????{??
  • ????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));??
  • ????????????????????if?(selector)??
  • ????????????????????{??
  • ????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?Selector,?false);??
  • ????????????????????????hasSelector?=?true;??
  • ????????????????????}??
  • ????????????????????else??
  • ????????????????????{??
  • ????????????????????????consumer?=?session.CreateDurableConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name),?ClientID,?null,?false);??
  • ????????????????????????hasSelector?=?false;??
  • ????????????????????}??
  • ????????????????????isTopic?=?true;??
  • ????????????????}??
  • ????????????????else??
  • ????????????????{??
  • ????????????????????prod?=?session.CreateProducer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??
  • ????????????????????if?(selector)??
  • ????????????????????{??
  • ????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name),?Selector);??
  • ????????????????????????hasSelector?=?true;??
  • ????????????????????}??
  • ????????????????????else??
  • ????????????????????{??
  • ????????????????????????consumer?=?session.CreateConsumer(new?Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));??
  • ????????????????????????hasSelector?=?false;??
  • ????????????????????}??
  • ????????????????????isTopic?=?false;??
  • ????????????????}??
  • ????????????????//創(chuàng)建一個(gè)發(fā)送的消息對(duì)象??
  • ????????????????msg?=?prod.CreateTextMessage();??
  • ????????????}??
  • ????????????catch?(System.Exception?e)??
  • ????????????{??
  • ????????????????sendSuccess?=?false;??
  • ????????????????receiveSuccess?=?false;??
  • ????????????????Console.WriteLine("Exception:{0}",?e.Message);??
  • ????????????????Console.ReadLine();??
  • ????????????????throw?e;??
  • ????????????}??
  • ??
  • ????????????return?sendSuccess;??
  • ????????}??
  • ??
  • ??
  • ????????public?bool?SendMessage(string?message,?string?msgId?=?"defult",?MsgPriority?priority?=?MsgPriority.Normal)??
  • ????????{??
  • ????????????if?(prod?==?null)??
  • ????????????{??
  • ????????????????sendSuccess?=?false;??
  • ????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??
  • ????????????????return?false;??
  • ????????????}??
  • ??
  • ????????????Console.WriteLine("Begin?send?messages...");??
  • ??
  • ????????????//給這個(gè)對(duì)象賦實(shí)際的消息??
  • ????????????msg.NMSCorrelationID?=?msgId;??
  • ????????????msg.Properties["MyID"]?=?msgId;??
  • ????????????msg.NMSMessageId?=?msgId;??
  • ????????????msg.Text?=?message;??
  • ????????????Console.WriteLine(message);??
  • ??
  • ????????????if?(isTopic)??
  • ????????????{??
  • ????????????????sendSuccess?=?ProducerSubcriber(message,?priority);??
  • ????????????}??
  • ????????????else??
  • ????????????{??
  • ????????????????sendSuccess?=?P2P(message,?priority);??
  • ????????????}??
  • ??
  • ????????????return?sendSuccess;??
  • ????????}??
  • ??
  • ??
  • ????????public?string?GetMessage()??
  • ????????{??
  • ????????????if?(prod?==?null)??
  • ????????????{??
  • ????????????????Console.WriteLine("call?InitQueueOrTopic()?first!!");??
  • ????????????????return?null;??
  • ????????????}??
  • ??
  • ????????????Console.WriteLine("Begin?receive?messages...");??
  • ????????????ITextMessage?revMessage?=?null;??
  • ????????????try??
  • ????????????{??
  • ????????????????//同步阻塞10ms,沒消息就直接返回null,注意此處時(shí)間不能設(shè)太短,否則還沒取到消息就直接返回null了!!!??
  • ????????????????revMessage?=?consumer.Receive(new?TimeSpan(TimeSpan.TicksPerMillisecond?*10))?as?ITextMessage;???
  • ????????????}??
  • ????????????catch?(System.Exception?e)??
  • ????????????{??
  • ????????????????receiveSuccess?=?false;??
  • ????????????????Console.WriteLine("Exception:{0}",?e.Message);??
  • ????????????????Console.ReadLine();??
  • ????????????????throw?e;??
  • ????????????}??
  • ??
  • ????????????if?(revMessage?==?null)??
  • ????????????{??
  • ????????????????Console.WriteLine("No?message?received!");??
  • ????????????????return?null;??
  • ????????????}??
  • ????????????else??
  • ????????????{??
  • ????????????????Console.WriteLine("Received?message?with?Correlation?ID:?"?+?revMessage.NMSCorrelationID);??
  • ????????????????//Console.WriteLine("Received?message?with?Properties'ID:?"?+?revMessage.Properties["MyID"]);??
  • ????????????????Console.WriteLine("Received?message?with?text:?"?+?revMessage.Text);??
  • ????????????}??
  • ??
  • ????????????return?revMessage.Text;??
  • ????????}??
  • ??
  • ????????//P2P模式,一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者??
  • ????????private?bool?P2P(string?message,?MsgPriority?priority)??
  • ????????{??
  • ????????????try??
  • ????????????{??
  • ????????????????if?(hasSelector)??
  • ????????????????{??
  • ????????????????????//設(shè)置消息對(duì)象的屬性,這個(gè)很重要,是Queue的過濾條件,也是P2P消息的唯一指定屬性??
  • ????????????????????msg.Properties.SetString("filter",?"demo");??//P2P模式??
  • ????????????????}??
  • ????????????????prod.Priority?=?priority;??
  • ????????????????//設(shè)置持久化??
  • ????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??
  • ????????????????//生產(chǎn)者把消息發(fā)送出去,幾個(gè)枚舉參數(shù)MsgDeliveryMode是否持久化,MsgPriority消息優(yōu)先級(jí)別,存活時(shí)間,當(dāng)然還有其他重載??
  • ????????????????prod.Send(msg,?MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??
  • ????????????}??
  • ????????????catch?(System.Exception?e)??
  • ????????????{??
  • ????????????????sendSuccess?=?false;??
  • ????????????????Console.WriteLine("Exception:{0}",?e.Message);??
  • ????????????????Console.ReadLine();??
  • ????????????????throw?e;??
  • ????????????}??
  • ??
  • ????????????return?sendSuccess;??
  • ????????}??
  • ??
  • ??
  • ????????//發(fā)布訂閱模式,一個(gè)生產(chǎn)者多個(gè)消費(fèi)者???
  • ????????private?bool?ProducerSubcriber(string?message,?MsgPriority?priority)??
  • ????????{??
  • ????????????try??
  • ????????????{??
  • ????????????????prod.Priority?=?priority;??
  • ????????????????//設(shè)置持久化,如果DeliveryMode沒有設(shè)置或者設(shè)置為NON_PERSISTENT,那么重啟MQ之后消息就會(huì)丟失??
  • ????????????????prod.DeliveryMode?=?MsgDeliveryMode.Persistent;??
  • ????????????????prod.Send(msg,?Apache.NMS.MsgDeliveryMode.Persistent,?priority,?TimeSpan.MinValue);??
  • ????????????????//System.Threading.Thread.Sleep(1000);????
  • ????????????}??
  • ????????????catch?(System.Exception?e)??
  • ????????????{??
  • ????????????????sendSuccess?=?false;??
  • ????????????????Console.WriteLine("Exception:{0}",?e.Message);??
  • ????????????????Console.ReadLine();??
  • ????????????????throw?e;??
  • ????????????}??
  • ??
  • ????????????return?sendSuccess;??
  • ????????}??
  • ??
  • ??
  • ????????public?void?ShutDown()??
  • ????????{??
  • ????????????Console.WriteLine("Close?connection?and?session...");??
  • ????????????session.Close();??
  • ????????????connection.Close();??
  • ????????}??
  • ????}??
  • }??


  • Program.cs代碼如下: [csharp]?view plain?copy
  • using?System;??
  • using?System.Collections.Generic;??
  • using?System.Linq;??
  • using?System.Text;??
  • using?System.Threading.Tasks;??
  • using?System.IO;??
  • using?System.Threading;??
  • ??
  • namespace?NmsProducerClasses??
  • {??
  • ????class?Program??
  • ????{??
  • ????????static?void?Main(string[]?args)??
  • ????????{??
  • ????????????MyActiveMq?mymq?=?new?MyActiveMq(isLocalMachine:?true,?remoteAddress:?"");??
  • ??
  • ????????????mymq.InitQueueOrTopic(topic:?false,?name:?"myqueue",?selector:?false);??
  • ????????????//mymq.InitQueueOrTopic(topic:?false,?name:?"seletorqueue",?selector:?true);???
  • ????????????//mymq.InitQueueOrTopic(topic:?true,?name:?"noselectortopic",?selector:?false);??
  • ????????????//mymq.InitQueueOrTopic(topic:?true,?name:?"selectortopic",?selector:?true);??
  • ??
  • ????????????//The?full?range?of?priority?values?(0-9)?are?supported?by?the?JDBC?message?store.?For?KahaDB?three?priority?categories?are?supported,?Low?(<?4),?Default?(=?4)?and?High?(>?4).??
  • ????????????User?myuser0?=?new?User("0000",?"Lowest",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser0),?"newid",?priority:?Apache.NMS.MsgPriority.Lowest);??
  • ????????????User?myuser1?=?new?User("1111",?"AboveLow",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser1),?"newid",?priority:?Apache.NMS.MsgPriority.AboveLow);??
  • ????????????User?myuser2?=?new?User("2222",?"AboveNormal",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser2),?"newid",?priority:?Apache.NMS.MsgPriority.AboveNormal);??
  • ????????????User?myuser3?=?new?User("0000",?"BelowNormal",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser3),?"newid",?priority:?Apache.NMS.MsgPriority.BelowNormal);??
  • ????????????User?myuser4?=?new?User("1111",?"High",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser4),?"newid",?priority:?Apache.NMS.MsgPriority.High);??
  • ????????????User?myuser5?=?new?User("2222",?"Highest",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser5),?"newid",?priority:?Apache.NMS.MsgPriority.Highest);??
  • ????????????User?myuser6?=?new?User("0000",?"Low",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser6),?"newid",?priority:?Apache.NMS.MsgPriority.Low);??
  • ????????????User?myuser7?=?new?User("1111",?"Normal",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser7),?"newid",?priority:?Apache.NMS.MsgPriority.Normal);??
  • ????????????User?myuser8?=?new?User("2222",?"VeryHigh",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryHigh);??
  • ????????????User?myuser9?=?new?User("2222",?"VeryLow",?"img/p.jpg");??
  • ????????????mymq.SendMessage(JsonUtil.ObjectToJson(myuser8),?"newid",?priority:?Apache.NMS.MsgPriority.VeryLow);??
  • ??
  • ????????????int?num?=?20;??
  • ????????????while?(num--?>?0)??
  • ????????????{??
  • ????????????????mymq.GetMessage();??
  • ????????????????//Thread.Sleep(1000);??
  • ????????????}??
  • ????????????mymq.ShutDown();??
  • ??????????????
  • ??
  • ????????????//XML測(cè)試??
  • ????????????//string?xml?=?XmlTest.ObjToXml();??
  • ????????????//Console.WriteLine("ObjToXml:?{0}",?xml);??
  • ??
  • ????????????//Json測(cè)試??
  • ????????????//User?u?=?new?User()?{?Id="88",?Imgurl="img/88.jpg",?Name="haha88"};??
  • ????????????//string?jsonstr?=?JsonUtil.ObjectToJson(u);??
  • ????????????//Console.WriteLine(jsonstr);??
  • ??????????????
  • ????????}??
  • ??
  • ????}??
  • 3、測(cè)試

    首先,需要啟動(dòng)消息隊(duì)列,具體啟動(dòng)及測(cè)試消息隊(duì)列步驟可見這邊:點(diǎn)擊打開鏈接 然后,運(yùn)行項(xiàng)目,運(yùn)行結(jié)果如下:

    4、優(yōu)先級(jí)

    priority并不能決定消息傳送的嚴(yán)格消息,具體原因可見 http://activemq.apache.org/how-can-i-support-priority-queues.html http://shift-alt-ctrl.iteye.com/blog/2034440?

    優(yōu)先級(jí)設(shè)置:

    在D:\apache-activemq-5.14.0\conf目錄的activemq.xml配置文件中,找到<destinationPolicy>標(biāo)簽,在其中的<policyEntries>標(biāo)簽下添加 [html]?view plain?copy
  • <policyEntry?queue=">"??producerFlowControl="false"?prioritizedMessages="true"?useCache="false"?expireMessagesPeriod="0"?queuePrefetch="1"?/>????
  • <policyEntry?queue=">"?strictOrderDispatch="false"?/>????
  • <policyEntry?queue=">"?>????
  • ??????????????<pendingMessageLimitStrategy>????
  • ??????????????????<constantPendingMessageLimitStrategy?limit="0"/>????
  • ??????????????</pendingMessageLimitStrategy>????
  • ??????????????<messageEvictionStrategy>????
  • ??????????????????<oldestMessageWithLowestPriorityEvictionStrategy/>????
  • ??????????????</messageEvictionStrategy>????
  • </policyEntry>????
  • 配置完成后,需要重啟activemq

    5、遠(yuǎn)程登錄監(jiān)控

    要實(shí)現(xiàn)遠(yuǎn)程監(jiān)控服務(wù)器消息隊(duì)列,需要先進(jìn)行配置。 配置方法:在D:\apache-activemq-5.14.0\conf目錄的jetty.xml配置文件中,把133開始的那段注釋去掉即可。

    總結(jié)

    以上是生活随笔為你收集整理的ActiveMq C#客户端 消息队列的使用(存和取)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。