rocketmq删除topic_RocketMq 快速入门教程
今年是不平凡的一年,對于每個個體都是。不論我們在哪兒,經(jīng)歷了什么,向前走總沒錯。雖然方向也很重要,但是不要在一個地方停太久,You young
編者薦語:RocketMQ 逐漸成為最主流的消息隊列,學(xué)習(xí) RocketMQ 是每個攻城獅要做的事
一、消息中間件
什么是消息系統(tǒng)
二、RocketMq簡介
發(fā)展
概念術(shù)語
架構(gòu)組成
三、安裝 RocketMq
系統(tǒng)環(huán)境
安裝部署 RocketMQ
RocketMq插件(可視化)
四、配置 RocketMq
啟動RocketMQ
驗證啟動是否成功
生產(chǎn)環(huán)境 ACL 權(quán)限認證
五、RocketMq 集群
RocketMq 集群、拓展閱讀
六、SpringBoot 集成 RocketMq
七、SpringMVC 架構(gòu)中使用 Demo
參考
一、消息中間件
什么是消息系統(tǒng)
簡單來說:消息被發(fā)送到隊列中?!跋㈥犃小?Message Queue)是在消息的傳輸過程中保存消息的容器。
- 拓展閱讀
消息隊列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性。主要具有以下優(yōu)勢:
削峰填谷(主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問題) 系統(tǒng)解耦(解決不同重要程度、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死) 提升性能(當(dāng)存在一對多調(diào)用時,可以發(fā)一條消息給消息系統(tǒng),讓消息系統(tǒng)通知相關(guān)系統(tǒng)) 蓄流壓測(線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測) 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優(yōu)勢特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持) ? 支持結(jié)合rocketmq的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提) ? 支持18個級別的延遲消息(rabbitmq和kafka不支持) ? 支持指定次數(shù)和時間間隔的失敗消息重發(fā)(kafka不支持,rabbitmq需要手動確認) ? 支持consumer端tag過濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持) ? 支持重復(fù)消費(rabbitmq不支持,kafka支持)
二、RocketMq簡介
官網(wǎng)地址:http://rocketmq.apache.org/
發(fā)展
阿里巴巴消息中間件起源于2001年的五彩石項目,Notify 在這期間應(yīng)運而生,用于交易核心消息的流轉(zhuǎn)。
至2010年,B2B開始大規(guī)模使用ActiveMQ作為消息內(nèi)核,隨著阿里業(yè)務(wù)的快速發(fā)展,急需一款支持順序消息,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生。
到2012年,MetaQ已經(jīng)發(fā)展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。隨后,將RocketMQ進行了開源,阿里的消息中間件正式走入了公眾的視野。
到2015年,RocketMQ已經(jīng)經(jīng)歷了多年雙十一的洗禮,在可用性、可靠性以及穩(wěn)定性等方面都有出色的表現(xiàn)。與此同時,云計算大行其道,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0,開始為阿里云上成千上萬家企業(yè)提供消息服務(wù)。
到今年,MetaQ在2016年雙十一承載了萬億級消息的流轉(zhuǎn),跨越了一個新的里程碑,同時RocketMQ進入Apache 孵化。
RocketMQ可以保證嚴格的消息順序
概念術(shù)語
Producer Group
標識發(fā)送同一類消息的Producer,通常發(fā)送邏輯一致。發(fā)送普通消息的時候,僅標識使用,并無特別用處。若事務(wù)消息,如果某條發(fā)送某條消息的producer-A宕機,使得事務(wù)消息一直處于PREPARED狀態(tài)并超時,則broker會回查同一個group的其 他producer,確認這條消息應(yīng)該commit還是rollback。但開源版本并不支持事務(wù)消息。
Consumer Group
標識一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。同一個Consumer Group下的各個實例將共同消費topic的消息,起到負載均衡的作用。
消費進度以Consumer Group為粒度管理,不同Consumer Group之間消費進度彼此不受影響,即消息A被Consumer Group1消費過,也會再給Consumer Group2消費。
注:RocketMQ要求同一個Consumer Group的消費者必須要擁有相同的注冊信息,即必須要聽一樣的topic(并且tag也一樣)。
Topic
標識一類消息的邏輯名字,消息的邏輯管理單位。無論消息生產(chǎn)還是消費,都需要指定Topic。
Tag
RocketMQ支持給在發(fā)送的時候給topic打tag,同一個topic的消息雖然邏輯管理是一樣的。但是消費topic1的時候,如果你訂閱的時候指定的是tagA,那么tagB的消息將不會投遞。
Message Queue
簡稱Queue或Q。消息物理管理單位。一個Topic將有若干個Q。若Topic同時創(chuàng)建在不通的Broker,則不同的broker上都有若干Q,消息將物理地存儲落在不同Broker結(jié)點上,具有水平擴展的能力。
無論生產(chǎn)者還是消費者,實際的生產(chǎn)和消費都是針對Q級別。例如Producer發(fā)送消息的時候,會預(yù)先選擇(默認輪詢)好該Topic下面的某一條Q地發(fā)送;Consumer消費的時候也會負載均衡地分配若干個Q,只拉取對應(yīng)Q的消息。
每一條message queue均對應(yīng)一個文件,這個文件存儲了實際消息的索引信息。并且即使文件被刪除,也能通過實際純粹的消息文件(commit log)恢復(fù)回來。
Offset
RocketMQ中,有很多offset的概念。但通常我們只關(guān)心暴露到客戶端的offset。一般我們不特指的話,就是指邏輯Message Queue下面的offset。
注:邏輯offset的概念在RocketMQ中字面意思實際上和真正的意思有一定差別,這點在設(shè)計上顯得有點混亂。祥見下面的解釋。
可以認為一條邏輯的message queue是無限長的數(shù)組。一條消息進來下標就會漲1,而這個數(shù)組的下標就是offset。
max offset
字面上可以理解為這是標識message queue中的max offset表示消息的最大offset。但是從源碼上看,這個offset實際上是最新消息的offset+1,即:下一條消息的offset。
min offset:
標識現(xiàn)存在的最小offset。而由于消息存儲一段時間后,消費會被物理地從磁盤刪除,message queue的min offset也就對應(yīng)增長。這意味著比min offset要小的那些消息已經(jīng)不在broker上了,無法被消費。
consumer offset
字面上,可以理解為標記Consumer Group在一條邏輯Message Queue上,消息消費到哪里即消費進度。但從源碼上看,這個數(shù)值是消費過的最新消費的消息offset+1,即實際上表示的是下次拉取的offset位置。
消費者拉取消息的時候需要指定offset,broker不主動推送消息, offset的消息返回給客戶端。
consumer剛啟動的時候會獲取持久化的consumer offset,用以決定從哪里開始消費,consumer以此發(fā)起第一次請求。
每次消息消費成功后,這個offset在會先更新到內(nèi)存,而后定時持久化。在集群消費模式下,會同步持久化到broker,而在廣播模式下,則會持久化到本地文件。
架構(gòu)組成
RocketMQ集群部署三、安裝 RocketMq
系統(tǒng)環(huán)境
JDK8 Windows10
安裝部署 RocketMQ
地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/rocketmq-all-4.3.0-bin-release.zip
選擇 Binary 進行下載
解壓
環(huán)境變量配置
必須配置
變量名:ROCKETMQ_HOME 變量值(絕對路徑):D:\rocketMq\rocketmq-all-4.3.0-bin-release
環(huán)境變量 Path 中也配置:%ROCKETMQ_HOME%\bin
RocketMq插件(可視化)
選擇性安裝
git clone https://github.com/apache/rocketmq-externals.git
解壓
修改配置
server.port=8088
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
cd ?\rocketmq-externals\rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd ?\rocketmq-externals\rocketmq-console\target
java -jar rocketmq-console-ng-2.0.0.jar
瀏覽器訪問:http://127.0.0.1:8088
四、配置 RocketMq
啟動RocketMQ
默認配置啟動
- 啟動
啟動NAMESERVER:
Cmd命令框執(zhí)行進入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqnamesrv.cmd’,啟動NAMESERVER。成功后會彈出提示框,此框勿關(guān)閉。
啟動BROKER:
Cmd命令框執(zhí)行進入至‘MQ文件夾\bin’下,然后執(zhí)行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關(guān)閉。
驗證啟動是否成功
驗證生產(chǎn)消息正常
執(zhí)行如下命令:
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
能看到類似如下輸出:
SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
驗證消費消息正常
執(zhí)行如下命令:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
能看到類似如下輸出:
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....
其他命令
RocketMQ 自帶的連接工具是 maadmin ,執(zhí)行 mqadmin.cmd 就可以查看有哪些命令參數(shù)可用。
$?mqadmin.cmdThe?most?commonly?used?mqadmin?commands?are:
???updateTopic??????????Update?or?create?topic
???deleteTopic??????????Delete?topic?from?broker?and?NameServer.
???updateSubGroup???????Update?or?create?subscription?group
???deleteSubGroup???????Delete?subscription?group?from?broker.
???updateBrokerConfig???Update?broker's?config
???updateTopicPerm??????Update?topic?perm
???topicRoute???????????Examine?topic?route?info
???topicStatus??????????Examine?topic?Status?info
???topicClusterList?????get?cluster?info?for?topic
???brokerStatus?????????Fetch?broker?runtime?status?data
???queryMsgById?????????Query?Message?by?Id
???queryMsgByKey????????Query?Message?by?Key
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???queryMsgByOffset?????Query?Message?by?offset
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???printMsg?????????????Print?Message?Detail
???printMsgByQueue??????Print?Message?Detail
???sendMsgStatus????????send?msg?to?broker.
???brokerConsumeStats???Fetch?broker?consume?stats?data
???producerConnection???Query?producer's?socket?connection?and?client?version
???consumerConnection???Query?consumer's?socket?connection,?client?version?and?subscription
???consumerProgress?????Query?consumers's?progress,?speed
???consumerStatus???????Query?consumer's?internal?data?structure
???cloneGroupOffset?????clone?offset?from?other?group.
???clusterList??????????List?all?of?clusters
???topicList????????????Fetch?all?topic?list?from?name?server
???updateKvConfig???????Create?or?update?KV?config.
???deleteKvConfig???????Delete?KV?config.
???wipeWritePerm????????Wipe?write?perm?of?broker?in?all?name?server
???resetOffsetByTime????Reset?consumer?offset?by?timestamp(without?client?restart).
???updateOrderConf??????Create?or?update?or?delete?order?conf
???cleanExpiredCQ???????Clean?expired?ConsumeQueue?on?broker.
???cleanUnusedTopic?????Clean?unused?topic?on?broker.
???startMonitoring??????Start?Monitoring
???statsAll?????????????Topic?and?Consumer?tps?stats
???allocateMQ???????????Allocate?MQ
???checkMsgSendRT???????check?message?send?response?time
???clusterRT????????????List?All?clusters?Message?Send?RT
???getNamesrvConfig?????Get?configs?of?name?server.
???updateNamesrvConfig??Update?configs?of?name?server.
???getBrokerConfig??????Get?broker?config?by?cluster?or?special?broker!
???queryCq??????????????Query?cq?command.
???sendMessage??????????Send?a?message
???consumeMessage???????Consume?message
See?'mqadmin?help?<command>'?for?more?information?on?a?specific?command.
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?PermSize=128m;?support?was?removed?in?8.0
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?MaxPermSize=128m;?support?was?removed?in?8.0
啟動成功后先簡單執(zhí)行一個命令(在 linux 上去掉 .cmd 即可)
- 查詢 topic 列表
mqadmin.cmd topicList -n '127.0.0.1:9876'
生產(chǎn)環(huán)境 ACL 權(quán)限認證
修改配置
使用默認配置,像上一節(jié)我們就可以啟動成功。但是在生產(chǎn)環(huán)境中,我們一定要加權(quán)限認證
打開 aclEnable 開關(guān)
在文件 conf/broker.conf 新增 aclEnable=true
新建文件 conf/plain_acl.yml,填入如下文本
accounts:
-?accessKey:?RocketMQ
??secretKey:?12345678
??whiteRemoteAddress:
??admin:?false
??defaultTopicPerm:?DENY
??defaultGroupPerm:?SUB
??topicPerms:
??-?TopicTest=PUB
??groupPerms:
??#?the?group?should?convert?to?retry?topic
??-?oms_consumer_group=DENY
-?accessKey:?admin
??secretKey:?12345678
??whiteRemoteAddress:
??#?if?it?is?admin,?it?could?access?all?resources
??admin:?true?
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
名詞介紹
globalWhiteRemoteAddresses
全局白名單,其類型為數(shù)組,即支持多個配置。其支持的配置格式如下:
空 表示不設(shè)置白名單,該條規(guī)則默認返回false?!啊?表示全部匹配,該條規(guī)則直接返回true,將會阻斷其他規(guī)則的判斷,請慎重使用。192.168.0.{100,101} 多地址配置模式,ip地址的最后一組,使用{},大括號中多個ip地址,用英文逗號(,)隔開。192.168.1.100,192.168.2.100 直接使用,分隔,配置多個ip地址。192.168..或192.168.100-200.10-20 每個IP段使用 "" 或"-"表示范圍。
accounts
配置用戶信息,該類型為數(shù)組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
accessKey 登錄用戶名,長度必須大于6個字符。
secretKey 登錄密碼。長度必須大于6個字符。
whiteRemoteAddress 用戶級別的IP地址白名單。其類型為一個字符串,其配置規(guī)則與globalWhiteRemoteAddresses,但只能配置一條規(guī)則。
admin boolean類型,設(shè)置是否是admin。如下權(quán)限只有admin=true時才有權(quán)限執(zhí)行。
UPDATE_AND_CREATE_TOPIC 更新或創(chuàng)建主題。UPDATE_BROKER_CONFIG 更新Broker配置。DELETE_TOPIC_IN_BROKER 刪除主題。UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或創(chuàng)建訂閱組信息。DELETE_SUBSCRIPTIONGROUP 刪除訂閱組信息。5. defaultTopicPerm 默認topic權(quán)限。該值默認為DENY(拒絕)。
defaultGroupPerm 默認消費組權(quán)限,該值默認為DENY(拒絕),建議值為SUB。
topicPerms 設(shè)置topic的權(quán)限。其類型為數(shù)組,其可選擇值在下節(jié)介紹。
groupPerms 設(shè)置消費組的權(quán)限。其類型為數(shù)組,其可選擇值在下節(jié)介紹。可以為每一消費組配置不一樣的權(quán)限。
五、RocketMq 集群
RocketMq 集群、拓展閱讀
參考:https://www.jianshu.com/p/2838890f3284
六、SpringBoot 集成 RocketMq
代碼會同步到 GitHub,回復(fù)github領(lǐng)取
七、SpringMVC 架構(gòu)中使用 Demo
參考
安裝部署參考博客:https://www.imooc.com/article/290089
完整中文參考文章:http://www.itmuch.com/books/rocketmq/operation.html
阿里文檔參考:https://help.aliyun.com/document_detail/29533.html
我是pub哥,Java服務(wù)端工程師,對大數(shù)據(jù)懂一點,三觀要正。我們下期再見
財經(jīng)相關(guān):
最近幾天白酒、半導(dǎo)體、新能源都在上漲,應(yīng)該有很多被洗人,投資建議,有持倉的韭菜,下跌再拋出。
總結(jié)
以上是生活随笔為你收集整理的rocketmq删除topic_RocketMq 快速入门教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jq苹果手机全屏下点击无效果_苹果系统自
- 下一篇: 在fritzing中怎么导入_电路图制作