日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rocketmq删除topic_RocketMq 快速入门教程

發布時間:2025/3/15 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rocketmq删除topic_RocketMq 快速入门教程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

今年是不平凡的一年,對于每個個體都是。不論我們在哪兒,經歷了什么,向前走總沒錯。雖然方向也很重要,但是不要在一個地方停太久,You young

編者薦語:RocketMQ 逐漸成為最主流的消息隊列,學習 RocketMQ 是每個攻城獅要做的事

  • 一、消息中間件

    • 什么是消息系統

  • 二、RocketMq簡介

    • 發展

    • 概念術語

    • 架構組成

  • 三、安裝 RocketMq

    • 系統環境

    • 安裝部署 RocketMQ

    • RocketMq插件(可視化)

  • 四、配置 RocketMq

    • 啟動RocketMQ

    • 驗證啟動是否成功

    • 生產環境 ACL 權限認證

  • 五、RocketMq 集群

    • RocketMq 集群、拓展閱讀

  • 六、SpringBoot 集成 RocketMq

  • 七、SpringMVC 架構中使用 Demo

  • 參考

一、消息中間件

什么是消息系統

簡單來說:消息被發送到隊列中?!跋㈥犃小?Message Queue)是在消息的傳輸過程中保存消息的容器。

  • 拓展閱讀

消息隊列作為高并發系統的核心組件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要具有以下優勢:

削峰填谷(主要解決瞬時寫壓力大于應用服務能力導致消息丟失、系統奔潰等問題) 系統解耦(解決不同重要程度、不同能力級別系統之間依賴導致一死全死) 提升性能(當存在一對多調用時,可以發一條消息給消息系統,讓消息系統通知相關系統) 蓄流壓測(線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測) 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優勢特性有:

? 支持事務型消息(消息發送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持) ? 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提) ? 支持18個級別的延遲消息(rabbitmq和kafka不支持) ? 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動確認) ? 支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持) ? 支持重復消費(rabbitmq不支持,kafka支持)

二、RocketMq簡介

官網地址:http://rocketmq.apache.org/

發展

阿里巴巴消息中間件起源于2001年的五彩石項目,Notify 在這期間應運而生,用于交易核心消息的流轉。

至2010年,B2B開始大規模使用ActiveMQ作為消息內核,隨著阿里業務的快速發展,急需一款支持順序消息,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生。

到2012年,MetaQ已經發展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。隨后,將RocketMQ進行了開源,阿里的消息中間件正式走入了公眾的視野。

到2015年,RocketMQ已經經歷了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。與此同時,云計算大行其道,阿里消息中間件基于RocketMQ推出了Aliware MQ 1.0,開始為阿里云上成千上萬家企業提供消息服務。

到今年,MetaQ在2016年雙十一承載了萬億級消息的流轉,跨越了一個新的里程碑,同時RocketMQ進入Apache 孵化。

RocketMQ可以保證嚴格的消息順序

概念術語

Producer Group

標識發送同一類消息的Producer,通常發送邏輯一致。發送普通消息的時候,僅標識使用,并無特別用處。若事務消息,如果某條發送某條消息的producer-A宕機,使得事務消息一直處于PREPARED狀態并超時,則broker會回查同一個group的其 他producer,確認這條消息應該commit還是rollback。但開源版本并不支持事務消息。

Consumer Group

標識一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。同一個Consumer Group下的各個實例將共同消費topic的消息,起到負載均衡的作用。

消費進度以Consumer Group為粒度管理,不同Consumer Group之間消費進度彼此不受影響,即消息A被Consumer Group1消費過,也會再給Consumer Group2消費。

注:RocketMQ要求同一個Consumer Group的消費者必須要擁有相同的注冊信息,即必須要聽一樣的topic(并且tag也一樣)。

Topic

標識一類消息的邏輯名字,消息的邏輯管理單位。無論消息生產還是消費,都需要指定Topic。

Tag

RocketMQ支持給在發送的時候給topic打tag,同一個topic的消息雖然邏輯管理是一樣的。但是消費topic1的時候,如果你訂閱的時候指定的是tagA,那么tagB的消息將不會投遞。

Message Queue

簡稱Queue或Q。消息物理管理單位。一個Topic將有若干個Q。若Topic同時創建在不通的Broker,則不同的broker上都有若干Q,消息將物理地存儲落在不同Broker結點上,具有水平擴展的能力。

無論生產者還是消費者,實際的生產和消費都是針對Q級別。例如Producer發送消息的時候,會預先選擇(默認輪詢)好該Topic下面的某一條Q地發送;Consumer消費的時候也會負載均衡地分配若干個Q,只拉取對應Q的消息。

每一條message queue均對應一個文件,這個文件存儲了實際消息的索引信息。并且即使文件被刪除,也能通過實際純粹的消息文件(commit log)恢復回來。

Offset

RocketMQ中,有很多offset的概念。但通常我們只關心暴露到客戶端的offset。一般我們不特指的話,就是指邏輯Message Queue下面的offset。

注:邏輯offset的概念在RocketMQ中字面意思實際上和真正的意思有一定差別,這點在設計上顯得有點混亂。祥見下面的解釋。

可以認為一條邏輯的message queue是無限長的數組。一條消息進來下標就會漲1,而這個數組的下標就是offset。

max offset

字面上可以理解為這是標識message queue中的max offset表示消息的最大offset。但是從源碼上看,這個offset實際上是最新消息的offset+1,即:下一條消息的offset。

min offset:

標識現存在的最小offset。而由于消息存儲一段時間后,消費會被物理地從磁盤刪除,message queue的min offset也就對應增長。這意味著比min offset要小的那些消息已經不在broker上了,無法被消費。

consumer offset

字面上,可以理解為標記Consumer Group在一條邏輯Message Queue上,消息消費到哪里即消費進度。但從源碼上看,這個數值是消費過的最新消費的消息offset+1,即實際上表示的是下次拉取的offset位置。

消費者拉取消息的時候需要指定offset,broker不主動推送消息, offset的消息返回給客戶端。

consumer剛啟動的時候會獲取持久化的consumer offset,用以決定從哪里開始消費,consumer以此發起第一次請求。

每次消息消費成功后,這個offset在會先更新到內存,而后定時持久化。在集群消費模式下,會同步持久化到broker,而在廣播模式下,則會持久化到本地文件。

架構組成

RocketMQ集群部署

三、安裝 RocketMq

系統環境

JDK8 Windows10

安裝部署 RocketMQ

  • 下載
  • 地址:http://rocketmq.apache.org/release_notes/release-notes-4.3.0/rocketmq-all-4.3.0-bin-release.zip

    選擇 Binary 進行下載

  • 解壓

  • 環境變量配置

  • 必須配置

    變量名:ROCKETMQ_HOME 變量值(絕對路徑):D:\rocketMq\rocketmq-all-4.3.0-bin-release

    環境變量 Path 中也配置:%ROCKETMQ_HOME%\bin

    RocketMq插件(可視化)

    選擇性安裝

  • 下載
  • git clone https://github.com/apache/rocketmq-externals.git

  • 解壓

  • 修改配置

  • server.contextPath=
    server.port=8088
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=127.0.0.1:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/tmp/rocketmq-console/data
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
  • 編譯
  • cd ?\rocketmq-externals\rocketmq-console

    mvn clean package -Dmaven.test.skip=true

  • 啟動
  • cd ?\rocketmq-externals\rocketmq-console\target

    java -jar rocketmq-console-ng-2.0.0.jar

  • 啟動成功
  • 瀏覽器訪問:http://127.0.0.1:8088

    四、配置 RocketMq

    啟動RocketMQ

    默認配置啟動

    • 啟動

    啟動NAMESERVER:

    Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqnamesrv.cmd’,啟動NAMESERVER。成功后會彈出提示框,此框勿關閉。

    啟動BROKER:

    Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關閉。

    驗證啟動是否成功

    驗證生產消息正常

    執行如下命令:

    export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

    能看到類似如下輸出:

    SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...

    驗證消費消息正常

    執行如下命令:

    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    能看到類似如下輸出:

    ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3, stor....

    其他命令

    RocketMQ 自帶的連接工具是 maadmin ,執行 mqadmin.cmd 就可以查看有哪些命令參數可用。

    $?mqadmin.cmd
    The?most?commonly?used?mqadmin?commands?are:
    ???updateTopic??????????Update?or?create?topic
    ???deleteTopic??????????Delete?topic?from?broker?and?NameServer.
    ???updateSubGroup???????Update?or?create?subscription?group
    ???deleteSubGroup???????Delete?subscription?group?from?broker.
    ???updateBrokerConfig???Update?broker's?config
    ???updateTopicPerm??????Update?topic?perm
    ???topicRoute???????????Examine?topic?route?info
    ???topicStatus??????????Examine?topic?Status?info
    ???topicClusterList?????get?cluster?info?for?topic
    ???brokerStatus?????????Fetch?broker?runtime?status?data
    ???queryMsgById?????????Query?Message?by?Id
    ???queryMsgByKey????????Query?Message?by?Key
    ???queryMsgByUniqueKey??Query?Message?by?Unique?key
    ???queryMsgByOffset?????Query?Message?by?offset
    ???queryMsgByUniqueKey??Query?Message?by?Unique?key
    ???printMsg?????????????Print?Message?Detail
    ???printMsgByQueue??????Print?Message?Detail
    ???sendMsgStatus????????send?msg?to?broker.
    ???brokerConsumeStats???Fetch?broker?consume?stats?data
    ???producerConnection???Query?producer's?socket?connection?and?client?version
    ???consumerConnection???Query?consumer's?socket?connection,?client?version?and?subscription
    ???consumerProgress?????Query?consumers's?progress,?speed
    ???consumerStatus???????Query?consumer's?internal?data?structure
    ???cloneGroupOffset?????clone?offset?from?other?group.
    ???clusterList??????????List?all?of?clusters
    ???topicList????????????Fetch?all?topic?list?from?name?server
    ???updateKvConfig???????Create?or?update?KV?config.
    ???deleteKvConfig???????Delete?KV?config.
    ???wipeWritePerm????????Wipe?write?perm?of?broker?in?all?name?server
    ???resetOffsetByTime????Reset?consumer?offset?by?timestamp(without?client?restart).
    ???updateOrderConf??????Create?or?update?or?delete?order?conf
    ???cleanExpiredCQ???????Clean?expired?ConsumeQueue?on?broker.
    ???cleanUnusedTopic?????Clean?unused?topic?on?broker.
    ???startMonitoring??????Start?Monitoring
    ???statsAll?????????????Topic?and?Consumer?tps?stats
    ???allocateMQ???????????Allocate?MQ
    ???checkMsgSendRT???????check?message?send?response?time
    ???clusterRT????????????List?All?clusters?Message?Send?RT
    ???getNamesrvConfig?????Get?configs?of?name?server.
    ???updateNamesrvConfig??Update?configs?of?name?server.
    ???getBrokerConfig??????Get?broker?config?by?cluster?or?special?broker!
    ???queryCq??????????????Query?cq?command.
    ???sendMessage??????????Send?a?message
    ???consumeMessage???????Consume?message
    See?'mqadmin?help?<command>'?for?more?information?on?a?specific?command.
    Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?PermSize=128m;?support?was?removed?in?8.0
    Java?HotSpot(TM)?64-Bit?Server?VM?warning:?ignoring?option?MaxPermSize=128m;?support?was?removed?in?8.0

    啟動成功后先簡單執行一個命令(在 linux 上去掉 .cmd 即可)

    • 查詢 topic 列表

    mqadmin.cmd topicList -n '127.0.0.1:9876'

    生產環境 ACL 權限認證

    修改配置

    使用默認配置,像上一節我們就可以啟動成功。但是在生產環境中,我們一定要加權限認證

  • 打開 aclEnable 開關

    在文件 conf/broker.conf 新增 aclEnable=true

  • 新建文件 conf/plain_acl.yml,填入如下文本

  • globalWhiteRemoteAddresses:

    accounts:
    -?accessKey:?RocketMQ
    ??secretKey:?12345678
    ??whiteRemoteAddress:
    ??admin:?false
    ??defaultTopicPerm:?DENY
    ??defaultGroupPerm:?SUB
    ??topicPerms:
    ??-?TopicTest=PUB
    ??groupPerms:
    ??#?the?group?should?convert?to?retry?topic
    ??-?oms_consumer_group=DENY

    -?accessKey:?admin
    ??secretKey:?12345678
    ??whiteRemoteAddress:
    ??#?if?it?is?admin,?it?could?access?all?resources
    ??admin:?true?
  • 重啟 RocketMQ
  • start mqnamesrv.cmd

    start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

    名詞介紹

    globalWhiteRemoteAddresses

    全局白名單,其類型為數組,即支持多個配置。其支持的配置格式如下:

    空 表示不設置白名單,該條規則默認返回false。“” 表示全部匹配,該條規則直接返回true,將會阻斷其他規則的判斷,請慎重使用。192.168.0.{100,101} 多地址配置模式,ip地址的最后一組,使用{},大括號中多個ip地址,用英文逗號(,)隔開。192.168.1.100,192.168.2.100 直接使用,分隔,配置多個ip地址。192.168..或192.168.100-200.10-20 每個IP段使用 "" 或"-"表示范圍。

    accounts

    配置用戶信息,該類型為數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

  • accessKey 登錄用戶名,長度必須大于6個字符。

  • secretKey 登錄密碼。長度必須大于6個字符。

  • whiteRemoteAddress 用戶級別的IP地址白名單。其類型為一個字符串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。

  • admin boolean類型,設置是否是admin。如下權限只有admin=true時才有權限執行。

  • UPDATE_AND_CREATE_TOPIC 更新或創建主題。UPDATE_BROKER_CONFIG 更新Broker配置。DELETE_TOPIC_IN_BROKER 刪除主題。UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或創建訂閱組信息。DELETE_SUBSCRIPTIONGROUP 刪除訂閱組信息。5. defaultTopicPerm 默認topic權限。該值默認為DENY(拒絕)。

  • defaultGroupPerm 默認消費組權限,該值默認為DENY(拒絕),建議值為SUB。

  • topicPerms 設置topic的權限。其類型為數組,其可選擇值在下節介紹。

  • groupPerms 設置消費組的權限。其類型為數組,其可選擇值在下節介紹。可以為每一消費組配置不一樣的權限。

  • 五、RocketMq 集群

    RocketMq 集群、拓展閱讀

    參考:https://www.jianshu.com/p/2838890f3284

    六、SpringBoot 集成 RocketMq

    代碼會同步到 GitHub,回復github領取

    七、SpringMVC 架構中使用 Demo

    參考

    安裝部署參考博客:https://www.imooc.com/article/290089

    完整中文參考文章:http://www.itmuch.com/books/rocketmq/operation.html

    阿里文檔參考:https://help.aliyun.com/document_detail/29533.html

    我是pub哥,Java服務端工程師,對大數據懂一點,三觀要正。我們下期再見

    財經相關:

    最近幾天白酒、半導體、新能源都在上漲,應該有很多被洗人,投資建議,有持倉的韭菜,下跌再拋出。

    總結

    以上是生活随笔為你收集整理的rocketmq删除topic_RocketMq 快速入门教程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。