日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

metaq发送和接收消息demo

發(fā)布時(shí)間:2023/12/20 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 metaq发送和接收消息demo 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??

一、maven依賴

<dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client</artifactId><version>1.4.6.2</version> </dependency><dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client-extension</artifactId><version>1.4.6.2</version> </dependency>

二、發(fā)送者

import?com.taobao.metamorphosis.Message; import?com.taobao.metamorphosis.client.MessageSessionFactory; import?com.taobao.metamorphosis.client.MetaClientConfig; import?com.taobao.metamorphosis.client.MetaMessageSessionFactory; import?com.taobao.metamorphosis.client.producer.MessageProducer; import?com.taobao.metamorphosis.client.producer.SendResult; import?com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import?java.io.BufferedReader; import?java.io.InputStreamReader;/***?Created?by?lc-t123?on?2016/4/14.*/ public?class?Producer?{public?static?void?main(String[]?args)?throws?Exception?{final?MetaClientConfig?metaClientConfig?=?new?MetaClientConfig();final?ZKConfig?zkConfig?=?new?ZKConfig();//設(shè)置zookeeper地址zkConfig.zkConnect?=?"192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);//?New?session?factory,強(qiáng)烈建議使用單例MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(metaClientConfig);/**??create?producer,強(qiáng)烈建議使用單例*??消息生產(chǎn)者的接口是MessageProducer,你可以通過(guò)它來(lái)發(fā)送消息*/MessageProducer?producer?=?sessionFactory.createProducer();//?publish?topicfinal?String?topic?=?"test";/**?這一步在發(fā)送消息前是必須的,你必須發(fā)布你將要發(fā)送消息的topic*?這是為了讓會(huì)話工廠幫你去查找接收這些topic的meta服務(wù)器地址并初始化連接*?這個(gè)步驟針對(duì)每個(gè)topic只需要做一次,多次調(diào)用無(wú)影響*/producer.publish(topic);BufferedReader?reader?=?new?BufferedReader(new?InputStreamReader(System.in));String?line?=?null;while?((line?=?reader.readLine())?!=?null){/**?send?message*?在Meta里,每個(gè)消息對(duì)象都是Message類的實(shí)例,Message表示一個(gè)消息對(duì)象,它包含這么幾個(gè)屬性:*?1)?id:?Long型的消息id,消息的唯一id,系統(tǒng)自動(dòng)產(chǎn)生,用戶無(wú)法設(shè)置,在發(fā)送成功后由服務(wù)器返回,發(fā)送失敗則為0。*?2)?topic:?消息的主題,訂閱者訂閱該主題即可接收發(fā)送到該主題下的消息,生產(chǎn)者通過(guò)指定發(fā)布的topic查找到需要連接的服務(wù)器地址,必須。*?3)?data:?消息的有效載荷,二進(jìn)制數(shù)據(jù),也就是消息內(nèi)容,meta永遠(yuǎn)不會(huì)修改消息內(nèi)容,你發(fā)送出去是什么樣子,接收到就是什么樣子。消息內(nèi)容通常限制在1M以內(nèi),我的建議是最好不要發(fā)送超過(guò)上百K的消息,必須。數(shù)據(jù)是否壓縮也完全取決于用戶。*?4)?attribute:?消息屬性,一個(gè)字符串,可選。發(fā)送者可設(shè)置消息屬性來(lái)讓消費(fèi)者過(guò)濾。*/SendResult?sendResult?=?producer.sendMessage(new?Message(topic,?line.getBytes()));//?check?resultif?(!sendResult.isSuccess()){System.err.println("Send?message?failed,error?message:"?+?sendResult.getErrorMessage());}else?{System.out.println("Send?message?successfully,sent?to?"?+?sendResult.getPartition());}}} }

三、接收者

import?com.taobao.metamorphosis.Message; import?com.taobao.metamorphosis.client.MessageSessionFactory; import?com.taobao.metamorphosis.client.MetaClientConfig; import?com.taobao.metamorphosis.client.MetaMessageSessionFactory; import?com.taobao.metamorphosis.client.consumer.ConsumerConfig; import?com.taobao.metamorphosis.client.consumer.MessageConsumer; import?com.taobao.metamorphosis.client.consumer.MessageListener; import?com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;import?java.util.concurrent.Executor;public?class?AsyncConsumer?{public?static?void?main(String[]?args)?throws?Exception?{final?MetaClientConfig?metaClientConfig?=?new?MetaClientConfig();final?ZKConfig?zkConfig?=?new?ZKConfig();//設(shè)置zookeeper地址zkConfig.zkConnect?=?"192.168.1.70:2181";metaClientConfig.setZkConfig(zkConfig);//?New?session?factory,強(qiáng)烈建議使用單例MessageSessionFactory?sessionFactory?=?new?MetaMessageSessionFactory(metaClientConfig);//?subscribed?topicfinal?String?topic?=?"test";//?consumer?groupfinal?String?group?=?"meta-example";/**?create?consumer,強(qiáng)烈建議使用單例*?通過(guò)createConsumer方法來(lái)創(chuàng)建MessageConsumer,注意到我們傳入一個(gè)ConsumerConfig參數(shù),*?這是消費(fèi)者的配置對(duì)象。每個(gè)消息者都必須有一個(gè)ConsumerConfig配置對(duì)象,*?我們這里只設(shè)置了group屬性,這是消費(fèi)者的分組名稱。*?Meta的Producer、Consumer和Broker都可以為集群。*?消費(fèi)者可以組成一個(gè)集群共同消費(fèi)同一個(gè)topic,發(fā)往這個(gè)topic的消息將按照一定的負(fù)載均衡規(guī)則發(fā)送給集群里的一臺(tái)機(jī)器。*?同一個(gè)消費(fèi)者集群必須擁有同一個(gè)分組名稱,也就是同一個(gè)group。我們這里將分組名稱設(shè)置為meta-example*/MessageConsumer?consumer?=?sessionFactory.createConsumer(new?ConsumerConfig(group));/**?subscribe?topic*?訂閱消息通過(guò)subscribe方法,這個(gè)方法接受三個(gè)參數(shù)*?1)?topic,訂閱的主題*?2)?maxSize,因?yàn)閙eta是一個(gè)消費(fèi)者主動(dòng)拉取的模型,這個(gè)參數(shù)規(guī)定每次拉取的最大數(shù)據(jù)量,單位為字節(jié),這里設(shè)置為1M,默認(rèn)最大為1M。*?3)?MessageListener,消息監(jiān)聽(tīng)器,負(fù)責(zé)消息消息。*/consumer.subscribe(topic,?1024?*?1024,?new?MessageListener()?{public?void?recieveMessages(Message?message)?{System.out.println("Receive?message?"?+?new?String(message.getData()));}public?Executor?getExecutor()?{//?Thread?pool?to?process?messages,maybe?null.return?null;}});//?complete?subscribeconsumer.completeSubscribe();} }

metaq-server安裝參考官方文檔

可以通過(guò)http://192.168.1.70:8120/?訪問(wèn)web界面

轉(zhuǎn)載于:https://my.oschina.net/chaun/blog/659479

總結(jié)

以上是生活随笔為你收集整理的metaq发送和接收消息demo的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。