日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

.net平台的rabbitmq使用封装

發(fā)布時(shí)間:2025/5/22 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 .net平台的rabbitmq使用封装 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前言

  RabbitMq大家再熟悉不過,這篇文章主要整對(duì)rabbitmq學(xué)習(xí)后封裝RabbitMQ.Client的一個(gè)分享。文章最后,我會(huì)把封裝組件和demo奉上。

Rabbitmq的關(guān)鍵術(shù)語

  1、綁定器(Binding):根據(jù)路由規(guī)則綁定Queue和Exchange。

  2、路由鍵(Routing Key):Exchange根據(jù)關(guān)鍵字進(jìn)行消息投遞。

  3、交換機(jī)(Exchange):指定消息按照路由規(guī)則進(jìn)入指定隊(duì)列

  4、消息隊(duì)列(Queue):消息的存儲(chǔ)載體

  5、生產(chǎn)者(Producer):消息發(fā)布者。

  6、消費(fèi)者(Consumer):消息接收者。

Rabbitmq的運(yùn)作

  從下圖可以看出,發(fā)布者(Publisher)是把消息先發(fā)送到交換器(Exchange),再?gòu)慕粨Q器發(fā)送到指定隊(duì)列(Queue),而先前已經(jīng)聲明交換器與隊(duì)列綁定關(guān)系,最后消費(fèi)者(Customer)通過訂閱或者主動(dòng)取指定隊(duì)列消息進(jìn)行消費(fèi)。

  那么剛剛提到的訂閱和主動(dòng)取可以理解成,推(被動(dòng)),拉(主動(dòng))。

  推,只要隊(duì)列增加一條消息,就會(huì)通知空閑的消費(fèi)者進(jìn)行消費(fèi)。(我不找你,就等你找我,觀察者模式)

  拉,不會(huì)通知消費(fèi)者,而是由消費(fèi)者主動(dòng)輪循或者定時(shí)去取隊(duì)列消息。(我需要才去找你)

  使用場(chǎng)景我舉個(gè)例子,假如有兩套系統(tǒng) 訂單系統(tǒng)和發(fā)貨系統(tǒng),從訂單系統(tǒng)發(fā)起發(fā)貨消息指令,為了及時(shí)發(fā)貨,發(fā)貨系統(tǒng)需要訂閱隊(duì)列,只要有指令就處理。

  可是程序偶爾會(huì)出異常,例如網(wǎng)絡(luò)或者DB超時(shí)了,把消息丟到失敗隊(duì)列,這個(gè)時(shí)候需要重發(fā)機(jī)制。但是我又不想while(IsPostSuccess == True),因?yàn)橹灰霎惓A?#xff0c;會(huì)在某個(gè)時(shí)間段內(nèi)都會(huì)有異常,這樣的重試是沒意義的。

  這個(gè)時(shí)候不需要及時(shí)的去處理消息,有個(gè)JOB定時(shí)或者每隔幾分鐘(失敗次數(shù)*間隔分鐘)去取失敗隊(duì)列消息,進(jìn)行重發(fā)。

Publish(發(fā)布)的封裝

  步驟:初始化鏈接->聲明交換器->聲明隊(duì)列->換機(jī)器與隊(duì)列綁定->發(fā)布消息。注意的是,我將Model存到了ConcurrentDictionary里面,因?yàn)槁暶髋c綁定是非常耗時(shí)的,其次,往重復(fù)的隊(duì)列發(fā)送消息是不需要重新初始化的。

1 /// <summary> 2 /// 交換器聲明 3 /// </summary> 4 /// <param name="iModel"></param> 5 /// <param name="exchange">交換器</param> 6 /// <param name="type">交換器類型: 7 /// 1、Direct Exchange – 處理路由鍵。需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全 8 /// 匹配。這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的 9 /// 消息才被轉(zhuǎn)發(fā),不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog 10 /// 2、Fanout Exchange – 不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都 11 /// 會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout 12 /// 交換機(jī)轉(zhuǎn)發(fā)消息是最快的。 13 /// 3、Topic Exchange – 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多 14 /// 個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 15 /// 只會(huì)匹配到“audit.irs”。</param> 16 /// <param name="durable">持久化</param> 17 /// <param name="autoDelete">自動(dòng)刪除</param> 18 /// <param name="arguments">參數(shù)</param> 19 private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct, 20 bool durable = true, 21 bool autoDelete = false, IDictionary<string, object> arguments = null) 22 { 23 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim(); 24 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); 25 } 26 27 /// <summary> 28 /// 隊(duì)列聲明 29 /// </summary> 30 /// <param name="channel"></param> 31 /// <param name="queue">隊(duì)列</param> 32 /// <param name="durable">持久化</param> 33 /// <param name="exclusive">排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次聲明它的連接可見, 34 /// 并在連接斷開時(shí)自動(dòng)刪除。這里需要注意三點(diǎn):其一,排他隊(duì)列是基于連接可見的,同一連接的不同信道是可 35 /// 以同時(shí)訪問同一個(gè)連接創(chuàng)建的排他隊(duì)列的。其二,“首次”,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連 36 /// 接是不允許建立同名的排他隊(duì)列的,這個(gè)與普通隊(duì)列不同。其三,即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者 37 /// 客戶端退出,該排他隊(duì)列都會(huì)被自動(dòng)刪除的。這種隊(duì)列適用于只限于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場(chǎng)景。</param> 38 /// <param name="autoDelete">自動(dòng)刪除</param> 39 /// <param name="arguments">參數(shù)</param> 40 private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false, 41 bool autoDelete = false, IDictionary<string, object> arguments = null) 42 { 43 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim(); 44 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); 45 } 46 47 /// <summary> 48 /// 獲取Model 49 /// </summary> 50 /// <param name="exchange">交換機(jī)名稱</param> 51 /// <param name="queue">隊(duì)列名稱</param> 52 /// <param name="routingKey"></param> 53 /// <param name="isProperties">是否持久化</param> 54 /// <returns></returns> 55 private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false) 56 { 57 return ModelDic.GetOrAdd(queue, key => 58 { 59 var model = _conn.CreateModel(); 60 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties); 61 QueueDeclare(model, queue, isProperties); 62 model.QueueBind(queue, exchange, routingKey); 63 ModelDic[queue] = model; 64 return model; 65 }); 66 } 67 68 /// <summary> 69 /// 發(fā)布消息 70 /// </summary> 71 /// <param name="routingKey">路由鍵</param> 72 /// <param name="body">隊(duì)列信息</param> 73 /// <param name="exchange">交換機(jī)名稱</param> 74 /// <param name="queue">隊(duì)列名</param> 75 /// <param name="isProperties">是否持久化</param> 76 /// <returns></returns> 77 public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false) 78 { 79 var channel = GetModel(exchange, queue, routingKey, isProperties); 80 81 try 82 { 83 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8()); 84 } 85 catch (Exception ex) 86 { 87 throw ex.GetInnestException(); 88 } 89 } View Code

  下次是本機(jī)測(cè)試的發(fā)布速度截圖:

  4.2W/S屬于穩(wěn)定速度,把反序列化(ToJson)會(huì)稍微快一些。

?

Subscribe(訂閱)的封裝

  發(fā)布的時(shí)候是申明了交換器和隊(duì)列并綁定,然而訂閱的時(shí)候只需要聲明隊(duì)列就可。從下面代碼能看到,捕獲到異常的時(shí)候,會(huì)把消息送到自定義的“死信隊(duì)列”里,由另外的JOB進(jìn)行定時(shí)重發(fā),因此,finally是應(yīng)答成功的。

/// <summary>/// 獲取Model/// </summary>/// <param name="queue">隊(duì)列名稱</param>/// <param name="isProperties"></param>/// <returns></returns>private static IModel GetModel(string queue, bool isProperties = false){return ModelDic.GetOrAdd(queue, value =>{var model = _conn.CreateModel();QueueDeclare(model, queue, isProperties);//每次消費(fèi)的消息數(shù)model.BasicQos(0, 1, false);ModelDic[queue] = model;return model;});} /// <summary>/// 接收消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="queue">隊(duì)列名稱</param>/// <param name="isProperties"></param>/// <param name="handler">消費(fèi)處理</param>/// <param name="isDeadLetter"></param>public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class{//隊(duì)列聲明var channel = GetModel(queue, isProperties);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var msgStr = body.DeserializeUtf8();var msg = msgStr.FromJson<T>();try{handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("隊(duì)列接收消息", "RabbitMq");if (!isDeadLetter)PublishToDead<DeadLetterQueue>(queue, msgStr, ex);}finally{channel.BasicAck(ea.DeliveryTag, false);}};channel.BasicConsume(queue, false, consumer);} View Code

  下次是本機(jī)測(cè)試的發(fā)布速度截圖:

  快的時(shí)候有1.9K/S,慢的時(shí)候也有1.7K/S

?

Pull(拉)的封裝

  直接上代碼:

/// <summary>/// 獲取消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="routingKey"></param>/// <param name="handler">消費(fèi)處理</param>private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class{var channel = GetModel(exchange, queue, routingKey);var result = channel.BasicGet(queue, false);if (result.IsNull())return;var msg = result.Body.DeserializeUtf8().FromJson<T>();try{handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("隊(duì)列接收消息", "RabbitMq");}finally{channel.BasicAck(result.DeliveryTag, false);}} View Code

  快的時(shí)候有1.8K/s,穩(wěn)定是1.5K/S

?

Rpc(遠(yuǎn)程調(diào)用)的封裝

  首先說明下,RabbitMq只是提供了這個(gè)RPC的功能,但是并不是真正的RPC,為什么這么說:

  1、傳統(tǒng)Rpc隱藏了調(diào)用細(xì)節(jié),像調(diào)用本地方法一樣傳參、拋出異常

  2、RabbitMq的Rpc是基于消息的,消費(fèi)者消費(fèi)后,通過新隊(duì)列返回響應(yīng)結(jié)果。

/// <summary>/// RPC客戶端/// </summary>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="routingKey"></param>/// <param name="body"></param>/// <param name="isProperties"></param>/// <returns></returns>public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false){var channel = GetModel(exchange, queue, routingKey, isProperties);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume(queue, true, consumer);try{var correlationId = Guid.NewGuid().ToString();var basicProperties = channel.CreateBasicProperties();basicProperties.ReplyTo = queue;basicProperties.CorrelationId = correlationId;channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());var sw = Stopwatch.StartNew();while (true){var ea = consumer.Queue.Dequeue();if (ea.BasicProperties.CorrelationId == correlationId){return ea.Body.DeserializeUtf8();}if (sw.ElapsedMilliseconds > 30000)throw new Exception("等待響應(yīng)超時(shí)");}}catch (Exception ex){throw ex.GetInnestException();}} /// <summary>/// RPC服務(wù)端/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="isProperties"></param>/// <param name="handler"></param>/// <param name="isDeadLetter"></param>public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter){//隊(duì)列聲明var channel = GetModel(queue, isProperties);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var msgStr = body.DeserializeUtf8();var msg = msgStr.FromJson<T>();var props = ea.BasicProperties;var replyProps = channel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;try{msg = handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("隊(duì)列接收消息", "RabbitMq");}finally{channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());channel.BasicAck(ea.DeliveryTag, false);}};channel.BasicConsume(queue, false, consumer);} View Code

?  可以用,但不建議去用。可以考慮其他的RPC框架。grpc、thrift等。

?結(jié)尾

  本篇文章,沒有過多的寫RabbitMq的知識(shí)點(diǎn),因?yàn)閳@子的學(xué)習(xí)筆記實(shí)在太多了。下面把我的代碼奉上 https://github.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有發(fā)現(xiàn)寫得不對(duì)的地方麻煩在評(píng)論指出,我會(huì)及時(shí)修改以免誤導(dǎo)別人。

  如果本篇文章您有用,請(qǐng)點(diǎn)擊一下推薦,謝謝大家閱讀。

轉(zhuǎn)載于:https://www.cnblogs.com/skychen1218/p/6496891.html

總結(jié)

以上是生活随笔為你收集整理的.net平台的rabbitmq使用封装的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。