日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息中间件NetMQ结合Protobuf简介

發(fā)布時間:2024/4/17 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息中间件NetMQ结合Protobuf简介 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

概述

  對于稍微熟悉這兩個優(yōu)秀的項目來說,每個內容單獨介紹都不為過,本文只是簡介并探討如何將兩部分內容合并起來,使其在某些場景下更適合、更高效。

?

  NetMQ:ZeroMQ的.Net版本,ZeroMQ簡單來說就是局域網(wǎng)內的消息中間件(與MSMQ類似),包括了進程間通訊、點對點通訊、訂閱模式通訊等等,底層用更“完美”的Socket實現(xiàn),ZeroMQ實現(xiàn)了多語言、跨平臺、高效率等諸多優(yōu)勢。詳細介紹請參考ZeroMQ和NetMQ官方文檔:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/。

?

  Protocol Buffer:源自與Google內部的開源項目,作為高效的RPC消息協(xié)議,相比較Json、XML協(xié)議的消息格式,Protobuf在序列化以及數(shù)據(jù)大小上都具有十分明顯的優(yōu)勢,跨平臺,協(xié)議可讀性也接近于Json等等。這里也推薦一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

?

定義Protobuf協(xié)議

  Protocol Buffer(簡稱Protobuf)是以.proto的腳本形式實現(xiàn)的通用語義形式,類似于Json格式:

message WeatherMessage { enum CommandType {Debug=0;Weather=1;Other=2;}required CommandType Command=1 [default=Weather];optional string Content=2;message Loaction {required int32 East=1;required int32 North=2;}repeated Loaction UserLocation=3; }

  這里的Message、required(必選屬性)、optional(可有可無屬性)、repeated(內部嵌套的類型屬性)等都是proto的關鍵字,具體意義以及為關鍵字的功能大家可以查看官方文檔,這里只介紹如何應用,或者Stephen Liu的文章也不錯。

  當然,光定義腳本是不能實現(xiàn)應用的,還需要根據(jù)特定的編碼語言進行描述,這里利用Protobuf-Net來實現(xiàn).Net平臺的協(xié)議實現(xiàn)。

  首先,下載軟件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

  然后,解壓并將剛才的.proto文件復制到文件夾ProtoGen下。

  最后,啟動CMD并cd到ProtoGen文件夾目錄下,運行命令:

  protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

(-i指定了輸入,-o指定了輸出,-ns指定了生成代碼的namespace)

  如果,正確的話(當然了,我給出的腳本是不會錯的),就會生成一個PBWeatherMessage.cs文件,這樣的話就可以將.cs文件加入到項目中當做一個純粹的類來使用了。

代碼中使用,就是類似于二進制序列化一樣,只是這回序列化的是Protobuf專用的序列化方式而已。

  序列化:

#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion

  反序列化:

var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}

  這里就簡單介紹完了protobuf協(xié)議的使用,下面介紹一下NetMQ+Protobuf的使用。

?

NetMQ+Protobuf

  接下來我們來改造下NetMQ Sample中的Publisher-Subscriber模式:

  首先下載從GitHub上下載NetMQ Sample: https://github.com/zeromq/netmq

或者下載我的示例代碼,其中包含了一個No Protobuf的工程,這個是直接摘自原作者的示例代碼。

  服務端Publisher:

using (var context = NetMQContext.Create())// NetMQ全局維護的Content上下文,建議只有一個并且使用完畢后及時回收。using (var publisher = context.CreatePublisherSocket())// 從Content上下文中創(chuàng)建CreatePublisherSocket,這里如果用其他四種模式之一需要Create其他類型。 {publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。var rng = new Random();while (!stopRequested){int zipcode = rng.Next(0, 99999);// 這里模擬一個隨機命令編號(如果非10001,客戶端直接丟棄此Publisher發(fā)布的消息,實現(xiàn)消息過濾)int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干凈整潔。 }}

  客戶端Subscriber:

using (var context = NetMQContext.Create())// 創(chuàng)建全局NetMQ句柄,建議唯一,使用完畢及時回收。 using (var subscriber = context.CreateSubscriberSocket())// 創(chuàng)建Publisher-Subscriber模式的客戶端監(jiān)聽。 {subscriber.Connect("tcp://127.0.0.1:5556");// 連接到指定Socketsubscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 這里創(chuàng)建消息內容的過濾,如果不包含“zipToSubscribeTo”值則不接收消息。for (int i = 0; i < iterations; i++){string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”開頭,則會返回整條信息。Console.Write(".");// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解碼。int zip = int.Parse(split[0]);if (zip != zipToSubscribeTo){throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));}totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}

  這就是四種模式之一的發(fā)布者模式,使用起來很方便,但是這僅僅傳遞的是基于String的字符串,還不是一個可以序列化的對象,下一步我們將把消息字符串用Protobuf進行序列化與反序列化,來優(yōu)化我們的消息格式。

請參考,我的示例代碼中的Publisher Pattern工程:

  服務端Publisher:

using (var context = NetMQContext.Create())using (var publisher = context.CreatePublisherSocket()) {publisher.Bind("tcp://127.0.0.1:5556");var rng = new Random();while (!stopRequested){ int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion// publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity)); WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));System.Threading.Thread.Sleep(100);}} View Code

  客戶端Subscriber:

using (var context = NetMQContext.Create())using (var subscriber = context.CreateSubscriberSocket()){subscriber.Connect("tcp://127.0.0.1:5556");subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.while (true){if (curIndex > iterations) break;var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);int cmdId = int.Parse(split[0]);if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather){if (cmdId == zipToSubscribeTo){curIndex++;WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}}

?

  好了,其實單獨來看,這兩部分內容并為涉及的很深入,只是作為一個技術實踐、技術儲備,希望其中有問題或者有更好的應用場景,還請各位留言,不勝感謝!

?

  我的示例代碼下載

冷靜下來

這里補充一些不足:

  • NetMQ中的過濾:默認NetMQ支持過濾,可是當我們摒棄String類型傳遞而轉向Protobuf格式的時候NetMQ通道是無法解析其內容的,所以我們需要先解析內容,然后手寫一些過濾代碼,放棄了原生的支持。subscriber.SubscribeToAnyTopic()監(jiān)聽所有非過濾模式。
  • NetMQ消息持久化:基于ZMQ的NetMQ設計理念中均不支持數(shù)據(jù)持久化(相比MSMQ而言,NetMQ不能接收當客戶端不在線情況下的消息),所以如果需要持久化還需要做其他工作或者轉戰(zhàn)其他MQ家族。
  • ?

    引用

    ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

    NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

    Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

    Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

    Protobuf-Net:https://code.google.com/p/protobuf-net/

    ?

    轉載于:https://www.cnblogs.com/cuiyansong/p/4326047.html

    總結

    以上是生活随笔為你收集整理的消息中间件NetMQ结合Protobuf简介的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。