rocketmq删除topic_RocketMq 快速入门教程
今年是不平凡的一年,對于每個個體都是。不論我們在哪兒,經歷了什么,向前走總沒錯。雖然方向也很重要,但是不要在一個地方停太久,You young
編者薦語:RocketMQ 逐漸成為最主流的消息隊列,學習 RocketMQ 是每個攻城獅要做的事
一、消息中間件
什么是消息系統
二、RocketMq簡介
發展
概念術語
架構組成
三、安裝 RocketMq
系統環境
安裝部署 RocketMQ
RocketMq插件(可視化)
四、配置 RocketMq
啟動RocketMQ
驗證啟動是否成功
生產環境 ACL 權限認證
五、RocketMq 集群
RocketMq 集群、拓展閱讀
六、SpringBoot 集成 RocketMq
七、SpringMVC 架構中使用 Demo
參考
一、消息中間件
什么是消息系統
簡單來說:消息被發送到隊列中?!跋㈥犃小?Message Queue)是在消息的傳輸過程中保存消息的容器。
- 拓展閱讀
消息隊列作為高并發系統的核心組件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要具有以下優勢:
削峰填谷(主要解決瞬時寫壓力大于應用服務能力導致消息丟失、系統奔潰等問題) 系統解耦(解決不同重要程度、不同能力級別系統之間依賴導致一死全死) 提升性能(當存在一對多調用時,可以發一條消息給消息系統,讓消息系統通知相關系統) 蓄流壓測(線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測) 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優勢特性有:
? 支持事務型消息(消息發送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持) ? 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提) ? 支持18個級別的延遲消息(rabbitmq和kafka不支持) ? 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動確認) ? 支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持) ? 支持重復消費(rabbitmq不支持,kafka支持)
二、RocketMq簡介
官網地址:http://rocketmq.apache.org/
發展
阿里巴巴消息中間件起源于2001年的五彩石項目,Notify 在這期間應運而生,用于交易核心消息的流轉。
至2010年,B2B開始大規模使用ActiveMQ作為消息內核,隨著阿里業務的快速發展,急需一款支持順序消息,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生。
到2012年,MetaQ已經發展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。隨后,將RocketMQ進行了開源,阿里的消息中間件正式走入了公眾的視野。
到2015年,RocketMQ已經經歷了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。與此同時,云計算大行其道,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0,開始為阿里云上成千上萬家企業提供消息服務。
到今年,MetaQ在2016年雙十一承載了萬億級消息的流轉,跨越了一個新的里程碑,同時RocketMQ進入Apache 孵化。
RocketMQ可以保證嚴格的消息順序
概念術語
Producer Group
標識發送同一類消息的Producer,通常發送邏輯一致。發送普通消息的時候,僅標識使用,并無特別用處。若事務消息,如果某條發送某條消息的producer-A宕機,使得事務消息一直處于PREPARED狀態并超時,則broker會回查同一個group的其 他producer,確認這條消息應該commit還是rollback。但開源版本并不支持事務消息。
Consumer Group
標識一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。同一個Consumer Group下的各個實例將共同消費topic的消息,起到負載均衡的作用。
消費進度以Consumer Group為粒度管理,不同Consumer Group之間消費進度彼此不受影響,即消息A被Consumer Group1消費過,也會再給Consumer Group2消費。
注:RocketMQ要求同一個Consumer Group的消費者必須要擁有相同的注冊信息,即必須要聽一樣的topic(并且tag也一樣)。
Topic
標識一類消息的邏輯名字,消息的邏輯管理單位。無論消息生產還是消費,都需要指定Topic。
Tag
RocketMQ支持給在發送的時候給topic打tag,同一個topic的消息雖然邏輯管理是一樣的。但是消費topic1的時候,如果你訂閱的時候指定的是tagA,那么tagB的消息將不會投遞。
Message Queue
簡稱Queue或Q。消息物理管理單位。一個Topic將有若干個Q。若Topic同時創建在不通的Broker,則不同的broker上都有若干Q,消息將物理地存儲落在不同Broker結點上,具有水平擴展的能力。
無論生產者還是消費者,實際的生產和消費都是針對Q級別。例如Producer發送消息的時候,會預先選擇(默認輪詢)好該Topic下面的某一條Q地發送;Consumer消費的時候也會負載均衡地分配若干個Q,只拉取對應Q的消息。
每一條message queue均對應一個文件,這個文件存儲了實際消息的索引信息。并且即使文件被刪除,也能通過實際純粹的消息文件(commit log)恢復回來。
Offset
RocketMQ中,有很多offset的概念。但通常我們只關心暴露到客戶端的offset。一般我們不特指的話,就是指邏輯Message Queue下面的offset。
注:邏輯offset的概念在RocketMQ中字面意思實際上和真正的意思有一定差別,這點在設計上顯得有點混亂。祥見下面的解釋。
可以認為一條邏輯的message queue是無限長的數組。一條消息進來下標就會漲1,而這個數組的下標就是offset。
max offset
字面上可以理解為這是標識message queue中的max offset表示消息的最大offset。但是從源碼上看,這個offset實際上是最新消息的offset+1,即:下一條消息的offset。
min offset:
標識現存在的最小offset。而由于消息存儲一段時間后,消費會被物理地從磁盤刪除,message queue的min offset也就對應增長。這意味著比min offset要小的那些消息已經不在broker上了,無法被消費。
consumer offset
字面上,可以理解為標記Consumer Group在一條邏輯Message Queue上,消息消費到哪里即消費進度。但從源碼上看,這個數值是消費過的最新消費的消息offset+1,即實際上表示的是下次拉取的offset位置。
消費者拉取消息的時候需要指定offset,broker不主動推送消息, offset的消息返回給客戶端。
consumer剛啟動的時候會獲取持久化的consumer offset,用以決定從哪里開始消費,consumer以此發起第一次請求。
每次消息消費成功后,這個offset在會先更新到內存,而后定時持久化。在集群消費模式下,會同步持久化到broker,而在廣播模式下,則會持久化到本地文件。
架構組成
RocketMQ集群部署三、安裝 RocketMq
系統環境
JDK8 Windows10
安裝部署 RocketMQ
地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/rocketmq-all-4.3.0-bin-release.zip
選擇 Binary 進行下載
解壓
環境變量配置
必須配置
變量名:ROCKETMQ_HOME 變量值(絕對路徑):D:\rocketMq\rocketmq-all-4.3.0-bin-release
環境變量 Path 中也配置:%ROCKETMQ_HOME%\bin
RocketMq插件(可視化)
選擇性安裝
git clone https://github.com/apache/rocketmq-externals.git
解壓
修改配置
server.port=8088
#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=127.0.0.1:9876
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
cd ?\rocketmq-externals\rocketmq-console
mvn clean package -Dmaven.test.skip=true
cd ?\rocketmq-externals\rocketmq-console\target
java -jar rocketmq-console-ng-2.0.0.jar
瀏覽器訪問:http://127.0.0.1:8088
四、配置 RocketMq
啟動RocketMQ
默認配置啟動
- 啟動
啟動NAMESERVER:
Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqnamesrv.cmd’,啟動NAMESERVER。成功后會彈出提示框,此框勿關閉。
啟動BROKER:
Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關閉。
驗證啟動是否成功
驗證生產消息正常
執行如下命令:
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
能看到類似如下輸出:
SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
驗證消費消息正常
執行如下命令:
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
能看到類似如下輸出:
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....
其他命令
RocketMQ 自帶的連接工具是 maadmin ,執行 mqadmin.cmd 就可以查看有哪些命令參數可用。
$?mqadmin.cmdThe?most?commonly?used?mqadmin?commands?are:
???updateTopic??????????Update?or?create?topic
???deleteTopic??????????Delete?topic?from?broker?and?NameServer.
???updateSubGroup???????Update?or?create?subscription?group
???deleteSubGroup???????Delete?subscription?group?from?broker.
???updateBrokerConfig???Update?broker's?config
???updateTopicPerm??????Update?topic?perm
???topicRoute???????????Examine?topic?route?info
???topicStatus??????????Examine?topic?Status?info
???topicClusterList?????get?cluster?info?for?topic
???brokerStatus?????????Fetch?broker?runtime?status?data
???queryMsgById?????????Query?Message?by?Id
???queryMsgByKey????????Query?Message?by?Key
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???queryMsgByOffset?????Query?Message?by?offset
???queryMsgByUniqueKey??Query?Message?by?Unique?key
???printMsg?????????????Print?Message?Detail
???printMsgByQueue??????Print?Message?Detail
???sendMsgStatus????????send?msg?to?broker.
???brokerConsumeStats???Fetch?broker?consume?stats?data
???producerConnection???Query?producer's?socket?connection?and?client?version
???consumerConnection???Query?consumer's?socket?connection,?client?version?and?subscription
???consumerProgress?????Query?consumers's?progress,?speed
???consumerStatus???????Query?consumer's?internal?data?structure
???cloneGroupOffset?????clone?offset?from?other?group.
???clusterList??????????List?all?of?clusters
???topicList????????????Fetch?all?topic?list?from?name?server
???updateKvConfig???????Create?or?update?KV?config.
???deleteKvConfig???????Delete?KV?config.
???wipeWritePerm????????Wipe?write?perm?of?broker?in?all?name?server
???resetOffsetByTime????Reset?consumer?offset?by?timestamp(without?client?restart).
???updateOrderConf??????Create?or?update?or?delete?order?conf
???cleanExpiredCQ???????Clean?expired?ConsumeQueue?on?broker.
???cleanUnusedTopic?????Clean?unused?topic?on?broker.
???startMonitoring??????Start?Monitoring
???statsAll?????????????Topic?and?Consumer?tps?stats
???allocateMQ???????????Allocate?MQ
???checkMsgSendRT???????check?message?send?response?time
???clusterRT????????????List?All?clusters?Message?Send?RT
???getNamesrvConfig?????Get?configs?of?name?server.
???updateNamesrvConfig??Update?configs?of?name?server.
???getBrokerConfig??????Get?broker?config?by?cluster?or?special?broker!
???queryCq??????????????Query?cq?command.
???sendMessage??????????Send?a?message
???consumeMessage???????Consume?message
See?'mqadmin?help?<command>'?for?more?information?on?a?specific?command.
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?PermSize=128m;?support?was?removed?in?8.0
Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?MaxPermSize=128m;?support?was?removed?in?8.0
啟動成功后先簡單執行一個命令(在 linux 上去掉 .cmd 即可)
- 查詢 topic 列表
mqadmin.cmd topicList -n '127.0.0.1:9876'
生產環境 ACL 權限認證
修改配置
使用默認配置,像上一節我們就可以啟動成功。但是在生產環境中,我們一定要加權限認證
打開 aclEnable 開關
在文件 conf/broker.conf 新增 aclEnable=true
新建文件 conf/plain_acl.yml,填入如下文本
accounts:
-?accessKey:?RocketMQ
??secretKey:?12345678
??whiteRemoteAddress:
??admin:?false
??defaultTopicPerm:?DENY
??defaultGroupPerm:?SUB
??topicPerms:
??-?TopicTest=PUB
??groupPerms:
??#?the?group?should?convert?to?retry?topic
??-?oms_consumer_group=DENY
-?accessKey:?admin
??secretKey:?12345678
??whiteRemoteAddress:
??#?if?it?is?admin,?it?could?access?all?resources
??admin:?true?
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
名詞介紹
globalWhiteRemoteAddresses
全局白名單,其類型為數組,即支持多個配置。其支持的配置格式如下:
空 表示不設置白名單,該條規則默認返回false。“” 表示全部匹配,該條規則直接返回true,將會阻斷其他規則的判斷,請慎重使用。192.168.0.{100,101} 多地址配置模式,ip地址的最后一組,使用{},大括號中多個ip地址,用英文逗號(,)隔開。192.168.1.100,192.168.2.100 直接使用,分隔,配置多個ip地址。192.168..或192.168.100-200.10-20 每個IP段使用 "" 或"-"表示范圍。
accounts
配置用戶信息,該類型為數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
accessKey 登錄用戶名,長度必須大于6個字符。
secretKey 登錄密碼。長度必須大于6個字符。
whiteRemoteAddress 用戶級別的IP地址白名單。其類型為一個字符串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。
admin boolean類型,設置是否是admin。如下權限只有admin=true時才有權限執行。
UPDATE_AND_CREATE_TOPIC 更新或創建主題。UPDATE_BROKER_CONFIG 更新Broker配置。DELETE_TOPIC_IN_BROKER 刪除主題。UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或創建訂閱組信息。DELETE_SUBSCRIPTIONGROUP 刪除訂閱組信息。5. defaultTopicPerm 默認topic權限。該值默認為DENY(拒絕)。
defaultGroupPerm 默認消費組權限,該值默認為DENY(拒絕),建議值為SUB。
topicPerms 設置topic的權限。其類型為數組,其可選擇值在下節介紹。
groupPerms 設置消費組的權限。其類型為數組,其可選擇值在下節介紹。可以為每一消費組配置不一樣的權限。
五、RocketMq 集群
RocketMq 集群、拓展閱讀
參考:https://www.jianshu.com/p/2838890f3284
六、SpringBoot 集成 RocketMq
代碼會同步到 GitHub,回復github領取
七、SpringMVC 架構中使用 Demo
參考
安裝部署參考博客:https://www.imooc.com/article/290089
完整中文參考文章:http://www.itmuch.com/books/rocketmq/operation.html
阿里文檔參考:https://help.aliyun.com/document_detail/29533.html
我是pub哥,Java服務端工程師,對大數據懂一點,三觀要正。我們下期再見
財經相關:
最近幾天白酒、半導體、新能源都在上漲,應該有很多被洗人,投資建議,有持倉的韭菜,下跌再拋出。
總結
以上是生活随笔為你收集整理的rocketmq删除topic_RocketMq 快速入门教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jq苹果手机全屏下点击无效果_苹果系统自
- 下一篇: 在fritzing中怎么导入_电路图制作