日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > linux >内容正文

linux

Linux环境下安装RocketMQ(MetaQ)

發(fā)布時間:2023/12/31 linux 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Linux环境下安装RocketMQ(MetaQ) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一:RocketMQ簡介

RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:

1.能夠保證嚴(yán)格的消息順序

2.提供豐富的消息拉取模式

3.高效的訂閱者水平擴展能力

4.實時的消息訂閱機制

5.億級消息堆積能力

二:安裝RocketMQ

下載源碼

首先我們從githup上獲取RocketMQ的源碼,目前最新的版本為3.5.8,下載地址為:https://github.com/alibaba/RocketMQ/releases 或者 wget ?https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz。請注意:此時我們下載的是源碼,直接解壓時不能用的,所以我們需要編譯之后才能使用。

編譯源碼

在進行編譯源碼之前我們需要安裝JDK。如果你已經(jīng)安裝過了,請?zhí)^這里。如果你還沒有安裝過JDK,請參考這篇文章(Linux環(huán)境下安裝JDK)。然后我們還需要安裝一下Maven。Maven的安裝還是比較簡單,只需要去官方上下載的安裝吧,然后直接解壓,再配置一下環(huán)境變量就OK。接下來我們把剛才下載來的RockeMQ的源碼解壓到/usr/local/rockemq-source文件夾中。在源碼中有一個Install.sh。如圖所示: 。運行sh install.sh。在編譯完成之后,我們只要target目錄下的alibaba-rocketmq這個文件夾中內(nèi)容,把alibaba-rocketmq文件夾中的內(nèi)容移動到/usr/local/rocketmq中。如果你不想編譯的話,可以從這里下載編譯之后的rocketmq。(rocketmq3.5.8)。

配置環(huán)境變量

接下來我們需要配置一下環(huán)境變量。在終端中輸入以下命令:vi /etc/profile ,在文件的末尾中添加如下兩句話:export rocketmq=/usr/local/rocketmq ?export PATH=$PATH:$rocketmq/bin。接下來我們使配置的換將變量生效:source /etc/profile.

三:啟動RocketMQ

接下來我們啟動一下剛才編譯的RocketMQ.在啟動之前我們需要修改一下RocketMQ啟動的內(nèi)存大小(如果你的系統(tǒng)內(nèi)存比較大的話,請忽略)。我們進入到/usr/local/rocketmq/bin中,在終端中輸入以下命令修改mqnamesrv的內(nèi)存大小:vi runserver.sh.修改為如圖的內(nèi)容: ,接下來修改broker的內(nèi)存大小:vi runbroker.sh:

啟動mqnameserver

進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最后的這個 & 不要少。

啟動mqbroker

進入到/usr/local/rocketmq/bin中輸入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以換成你剛才啟動mqnamesrv的IP。autoCreateTopicEnable=true 這句話不要少了。最后的 & 也不要少了。 我們可以通過 ps aux | grep java命令來查看啟動的情況。
到此,rocketmq的安裝完畢。 四:RocketMQ的小例子 producer: package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException;import java.util.concurrent.TimeUnit;/*** Created by zkn on 2016/10/27.*/ public class ProducerTest01 {public static void main(String[] args) {/*** 一個應(yīng)用創(chuàng)建一個Producer,由應(yīng)用來維護此對象,可以設(shè)置為全局對象或者單例<br>* 注意:ProducerGroupName需要由應(yīng)用來保證唯一<br>* ProducerGroup這個概念發(fā)送普通的消息時,作用不大,但是發(fā)送分布式事務(wù)消息時,比較關(guān)鍵,* 因為服務(wù)器會回查這個Group下的任意一個Producer*/DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");//producer.setNamesrvAddr("192.168.180.1:9876");producer.setNamesrvAddr("192.168.180.133:9876");producer.setInstanceName("Producer");/*** Producer對象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>* 注意:切記不可以在每次發(fā)送消息時,都調(diào)用start方法*/try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 100; i++) {try {/*** 下面這段代碼表明一個Producer對象可以發(fā)送多個topic,多個tag的消息。* 注意:send方法是同步調(diào)用,只要不拋異常就標(biāo)識成功。但是發(fā)送成功也可會有多種狀態(tài),<br>* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬于成功,但是對于個別應(yīng)用如果對消息可靠性要求極高,<br>* 需要對這種情況做處理。另外,消息可能會存在發(fā)送失敗的情況,失敗重試由應(yīng)用來處理。*/{Message msg = new Message("TopicTest1",// topic"TagA",// tag"OrderID001",// key("Hello MetaQ").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest2","TagB","OrderID001",("Hello MetaQ TagB".getBytes()));SendResult sendResult = producer.send(msg);System.out.println(sendResult);}{Message msg = new Message("TopicTest3","TagC","OrderID001",("Hello MetaQ TagC").getBytes());SendResult sendResult = producer.send(msg);System.out.println(sendResult);}TimeUnit.MILLISECONDS.sleep(1000);} catch (MQClientException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();}}/*** 應(yīng)用退出時,要調(diào)用shutdown來清理資源,關(guān)閉網(wǎng)絡(luò)連接,從MetaQ服務(wù)器上注銷自己* 注意:我們建議應(yīng)用在JBOSS、Tomcat等容器的退出銷毀方法里調(diào)用shutdown方法*/producer.shutdown();} } consumer: package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by zkn on 2016/10/27.*/ public class ConsumerTest01 {/*** 當(dāng)前例子是PushConsumer用法,使用方式給用戶感覺是消息從RocketMQ服務(wù)器推到了應(yīng)用客戶端。<br>* 但是實際PushConsumer內(nèi)部是使用長輪詢Pull方式從MetaQ服務(wù)器拉消息,然后再回調(diào)用戶Listener方法<br>*/public static void main(String[] args) {/*** 注意:ConsumerGroupName需要由應(yīng)用來保證唯一*/DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");//pushConsumer.setNamesrvAddr("192.168.180.1:9876");pushConsumer.setNamesrvAddr("192.168.180.133:9876");pushConsumer.setInstanceName("Consumer");try {/*** 訂閱指定topic下tags分別等于TagA或TagC或TagD* 兩個參數(shù):第一個參數(shù)是topic第二個參數(shù)是tags*/pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");/*** 訂閱指定topic下所有消息<br>* 注意:一個consumer對象可以訂閱多個topic*///pushConsumer.subscribe("TopicTest2", "*");pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());MessageExt messageExt = msgs.get(0);if("TopicTest1".equals(messageExt.getTopic())){// 執(zhí)行TopicTest1的消費邏輯if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {// 執(zhí)行TagA的消費System.out.println(new String(messageExt.getBody()));}else if(messageExt.getTags() != null && messageExt.getTags().equals("TagB")){System.out.println(new String(messageExt.getBody()));}else if(messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {System.out.println(new String(messageExt.getBody()));}}else if("TopicTest2".equals(messageExt.getTopic())){System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});} catch (MQClientException e) {e.printStackTrace();}/*** Consumer對象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>*/try {pushConsumer.start();} catch (MQClientException e) {e.printStackTrace();}System.out.println("Consumer Started.");} } package com.zkn.newlearn.rocketmq;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;/*** Created by zkn on 2016/10/30.*/ public class ConsumerTest02 extends ConsumerTest01 {public static void main(String[] args) {DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");//pushConsumer.setNamesrvAddr("192.168.180.1:9876");pushConsumer.setNamesrvAddr("192.168.180.133:9876");pushConsumer.setInstanceName("Consumer");/*** 訂閱指定topic下所有消息<br>* 注意:一個consumer對象可以訂閱多個topic*/try {pushConsumer.subscribe("TopicTest2", "*");pushConsumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);if("TopicTest2".equals(messageExt.getTopic())){System.out.println(new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});} catch (MQClientException e) {e.printStackTrace();}try {pushConsumer.start();} catch (MQClientException e) {e.printStackTrace();}} }






總結(jié)

以上是生活随笔為你收集整理的Linux环境下安装RocketMQ(MetaQ)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。