消息中间件NetMQ结合Protobuf简介
概述
對(duì)于稍微熟悉這兩個(gè)優(yōu)秀的項(xiàng)目來(lái)說(shuō),每個(gè)內(nèi)容單獨(dú)介紹都不為過(guò),本文只是簡(jiǎn)介并探討如何將兩部分內(nèi)容合并起來(lái),使其在某些場(chǎng)景下更適合、更高效。
?
NetMQ:ZeroMQ的.Net版本,ZeroMQ簡(jiǎn)單來(lái)說(shuō)就是局域網(wǎng)內(nèi)的消息中間件(與MSMQ類似),包括了進(jìn)程間通訊、點(diǎn)對(duì)點(diǎn)通訊、訂閱模式通訊等等,底層用更“完美”的Socket實(shí)現(xiàn),ZeroMQ實(shí)現(xiàn)了多語(yǔ)言、跨平臺(tái)、高效率等諸多優(yōu)勢(shì)。詳細(xì)介紹請(qǐng)參考ZeroMQ和NetMQ官方文檔:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/。
?
Protocol Buffer:源自與Google內(nèi)部的開(kāi)源項(xiàng)目,作為高效的RPC消息協(xié)議,相比較Json、XML協(xié)議的消息格式,Protobuf在序列化以及數(shù)據(jù)大小上都具有十分明顯的優(yōu)勢(shì),跨平臺(tái),協(xié)議可讀性也接近于Json等等。這里也推薦一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
?
定義Protobuf協(xié)議
Protocol Buffer(簡(jiǎn)稱Protobuf)是以.proto的腳本形式實(shí)現(xiàn)的通用語(yǔ)義形式,類似于Json格式:
message WeatherMessage { enum CommandType {Debug=0;Weather=1;Other=2;}required CommandType Command=1 [default=Weather];optional string Content=2;message Loaction {required int32 East=1;required int32 North=2;}repeated Loaction UserLocation=3; }這里的Message、required(必選屬性)、optional(可有可無(wú)屬性)、repeated(內(nèi)部嵌套的類型屬性)等都是proto的關(guān)鍵字,具體意義以及為關(guān)鍵字的功能大家可以查看官方文檔,這里只介紹如何應(yīng)用,或者Stephen Liu的文章也不錯(cuò)。
當(dāng)然,光定義腳本是不能實(shí)現(xiàn)應(yīng)用的,還需要根據(jù)特定的編碼語(yǔ)言進(jìn)行描述,這里利用Protobuf-Net來(lái)實(shí)現(xiàn).Net平臺(tái)的協(xié)議實(shí)現(xiàn)。
首先,下載軟件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)
然后,解壓并將剛才的.proto文件復(fù)制到文件夾ProtoGen下。
最后,啟動(dòng)CMD并cd到ProtoGen文件夾目錄下,運(yùn)行命令:
protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace
(-i指定了輸入,-o指定了輸出,-ns指定了生成代碼的namespace)
如果,正確的話(當(dāng)然了,我給出的腳本是不會(huì)錯(cuò)的),就會(huì)生成一個(gè)PBWeatherMessage.cs文件,這樣的話就可以將.cs文件加入到項(xiàng)目中當(dāng)做一個(gè)純粹的類來(lái)使用了。
代碼中使用,就是類似于二進(jìn)制序列化一樣,只是這回序列化的是Protobuf專用的序列化方式而已。
序列化:
#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion反序列化:
var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}這里就簡(jiǎn)單介紹完了protobuf協(xié)議的使用,下面介紹一下NetMQ+Protobuf的使用。
?
NetMQ+Protobuf
接下來(lái)我們來(lái)改造下NetMQ Sample中的Publisher-Subscriber模式:
首先下載從GitHub上下載NetMQ Sample: https://github.com/zeromq/netmq
或者下載我的示例代碼,其中包含了一個(gè)No Protobuf的工程,這個(gè)是直接摘自原作者的示例代碼。
服務(wù)端Publisher:
using (var context = NetMQContext.Create())// NetMQ全局維護(hù)的Content上下文,建議只有一個(gè)并且使用完畢后及時(shí)回收。using (var publisher = context.CreatePublisherSocket())// 從Content上下文中創(chuàng)建CreatePublisherSocket,這里如果用其他四種模式之一需要Create其他類型。 {publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。var rng = new Random();while (!stopRequested){int zipcode = rng.Next(0, 99999);// 這里模擬一個(gè)隨機(jī)命令編號(hào)(如果非10001,客戶端直接丟棄此Publisher發(fā)布的消息,實(shí)現(xiàn)消息過(guò)濾)int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干凈整潔。 }}客戶端Subscriber:
using (var context = NetMQContext.Create())// 創(chuàng)建全局NetMQ句柄,建議唯一,使用完畢及時(shí)回收。 using (var subscriber = context.CreateSubscriberSocket())// 創(chuàng)建Publisher-Subscriber模式的客戶端監(jiān)聽(tīng)。 {subscriber.Connect("tcp://127.0.0.1:5556");// 連接到指定Socketsubscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 這里創(chuàng)建消息內(nèi)容的過(guò)濾,如果不包含“zipToSubscribeTo”值則不接收消息。for (int i = 0; i < iterations; i++){string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”開(kāi)頭,則會(huì)返回整條信息。Console.Write(".");// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解碼。int zip = int.Parse(split[0]);if (zip != zipToSubscribeTo){throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));}totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}這就是四種模式之一的發(fā)布者模式,使用起來(lái)很方便,但是這僅僅傳遞的是基于String的字符串,還不是一個(gè)可以序列化的對(duì)象,下一步我們將把消息字符串用Protobuf進(jìn)行序列化與反序列化,來(lái)優(yōu)化我們的消息格式。
請(qǐng)參考,我的示例代碼中的Publisher Pattern工程:
服務(wù)端Publisher:
using (var context = NetMQContext.Create())using (var publisher = context.CreatePublisherSocket()) {publisher.Bind("tcp://127.0.0.1:5556");var rng = new Random();while (!stopRequested){ int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);int temperature = rng.Next(-80, 135);int relhumidity = rng.Next(0, 90);#region Protobufvar weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(){Command = PBProtocol.WeatherMessage.CommandType.Weather,Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), };using (var sm = new MemoryStream()){ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);publisher.Send(sm.ToArray());}#endregion// publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity)); WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));System.Threading.Thread.Sleep(100);}} View Code客戶端Subscriber:
using (var context = NetMQContext.Create())using (var subscriber = context.CreateSubscriberSocket()){subscriber.Connect("tcp://127.0.0.1:5556");subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.while (true){if (curIndex > iterations) break;var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();var receivedBytes = subscriber.Receive();using (var sm = new MemoryStream(receivedBytes)){weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);}// "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);int cmdId = int.Parse(split[0]);if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather){if (cmdId == zipToSubscribeTo){curIndex++;WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));totalTemp += int.Parse(split[1]);totalHumidity += int.Parse(split[2]);}}}?
好了,其實(shí)單獨(dú)來(lái)看,這兩部分內(nèi)容并為涉及的很深入,只是作為一個(gè)技術(shù)實(shí)踐、技術(shù)儲(chǔ)備,希望其中有問(wèn)題或者有更好的應(yīng)用場(chǎng)景,還請(qǐng)各位留言,不勝感謝!
?
我的示例代碼下載
冷靜下來(lái)
這里補(bǔ)充一些不足:
?
引用
ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns
NetMQ:http://netmq.readthedocs.org/en/latest/introduction/
Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html
Protobuf-Net:https://code.google.com/p/protobuf-net/
?
轉(zhuǎn)載于:https://www.cnblogs.com/cuiyansong/p/4326047.html
總結(jié)
以上是生活随笔為你收集整理的消息中间件NetMQ结合Protobuf简介的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Oracle资源管理器(二)-- 创建和
- 下一篇: 程序员面试不完全指南