日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

【消息中间件】浅谈中间件优缺RabbitMQ基本使用

發(fā)布時間:2025/5/22 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【消息中间件】浅谈中间件优缺RabbitMQ基本使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

消息中間件概述

MQ全稱為Message Queue,消息隊列是應用程序和應用程序之間的通信方法。是在消息的傳輸過程中保存消息的容器。多用于分布式系統(tǒng)之間進行通信。

  • 為什么使用MQ
    在項目中,可將一些無需即時返回且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節(jié)省了服務器的請求響應時間,從而提高了系統(tǒng)的吞吐量。

  • 開發(fā)中消息隊列通常有如下應用場景:

    • 任務異步處理
      將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進行異步處理。提高了應用程序的響應時間。
    • 應用程序解耦合
      MQ相當于一個中介,生產(chǎn)方通過MQ與消費方交互,它將應用程序進行解耦合。
    • 削峰填谷
      如訂單系統(tǒng),在下單的時候就會往數(shù)據(jù)庫寫數(shù)據(jù)。但是數(shù)據(jù)庫只能支撐每秒1000左右的并發(fā)寫入,并發(fā)量再高就容易宕機。低峰期的時候并發(fā)也就100多個,但是在高峰期時候,并發(fā)量會突然激增到5000以上,這個時候數(shù)據(jù)庫肯定卡死了。
      消息被MQ保存起來了,然后系統(tǒng)就可以按照自己的消費能力來消費,比如每秒1000個數(shù)據(jù),這樣慢慢寫入數(shù)據(jù)庫,這樣就不會卡死數(shù)據(jù)庫了。
      但是使用了MQ之后,限制消費消息的速度為1000,但是這樣一來,高峰期產(chǎn)生的數(shù)據(jù)勢必會被積壓在MQ中,高峰就被“削”掉了。但是因為消息積壓,在高峰期過后的一段時間內(nèi),消費消息的速度還是會維持在1000QPS,直到消費完積壓的消息,這就叫做“填谷”

MQ 的劣勢

  • 系統(tǒng)可用性降低
    系統(tǒng)引入的外部依賴越多,系統(tǒng)穩(wěn)定性越差。一旦 MQ 宕機,就會對業(yè)務造成影響。如何保證MQ的高可用?
  • 系統(tǒng)復雜度提高
    MQ 的加入大大增加了系統(tǒng)的復雜度,以前系統(tǒng)間是同步的遠程調(diào)用,現(xiàn)在是通過 MQ 進行異步調(diào)用。如何保證消息沒有被重復消費?怎么處理消息丟失情況?那么保證消息傳遞的順序性?
  • 一致性問題
    A 系統(tǒng)處理完業(yè)務,通過 MQ 給B、C、D三個系統(tǒng)發(fā)消息數(shù)據(jù),如果 B 系統(tǒng)、C 系統(tǒng)處理成功,D 系統(tǒng)處理失敗。如何保證消息數(shù)據(jù)處理的一致性?

既然 MQ 有優(yōu)勢也有劣勢,那么使用 MQ 需要滿足什么條件呢?

  • 生產(chǎn)者不需要從消費者處獲得反饋。引入消息隊列之前的直接調(diào)用,其接口的返回值應該為空,這才讓明明下層的動作還沒做,上層卻當成動作做完了繼續(xù)往后走,即所謂異步成為了可能。
  • 容許短暫的不一致性。
  • 確實是用了有效果。即解耦、提速、削峰這些方面的收益,超過加入MQ,管理MQ這些成本。

AMQP 和 JMS

MQ是消息通信的模型;實現(xiàn)MQ的大致有兩種主流方式:AMQP、JMS。

AMQP

AMQP是一種協(xié)議,更準確的說是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進行限定,而是直接定義網(wǎng)絡交換的數(shù)據(jù)格式。

AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協(xié)議),是一個網(wǎng)絡協(xié)議,是應用層協(xié)議的一個開放標準,為面向消息的中間件設計。基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。2006年,AMQP 規(guī)范發(fā)布。類比HTTP。

2007年,Rabbit 技術(shù)公司基于 AMQP 標準開發(fā)的 RabbitMQ 1.0 發(fā)布。RabbitMQ 采用 Erlang 語言開發(fā)。
Erlang 語言由 Ericson 設計,專門為開發(fā)高并發(fā)和分布式系統(tǒng)的一種語言,在電信領域使用廣泛。

JMS

JMS即Java消息服務(JavaMessage Service)應用程序接口,是一個Java平臺中關于面向消息中間件(MOM)的API,用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。

JMS 是 JavaEE 規(guī)范中的一種,類比JDBC

很多消息中間件都實現(xiàn)了JMS規(guī)范,例如:ActiveMQ。RabbitMQ 官方?jīng)]有提供 JMS 的實現(xiàn)包,但是開源社區(qū)有

AMQP 與 JMS 區(qū)別

  • JMS是定義了統(tǒng)一的接口,來對消息操作進行統(tǒng)一;AMQP是通過規(guī)定協(xié)議來統(tǒng)一數(shù)據(jù)交互的格式
  • JMS限定了必須使用Java語言;AMQP只是協(xié)議,不規(guī)定實現(xiàn)方式,因此是跨語言的。
  • JMS規(guī)定了兩種消息模式;而AMQP的消息模式更加豐富

消息隊列產(chǎn)品

市場上常見的消息隊列有如下:

  • ActiveMQ:基于JMS
  • ZeroMQ:基于C語言開發(fā)
  • RabbitMQ:基于AMQP協(xié)議,erlang語言開發(fā),穩(wěn)定性好
  • RocketMQ:基于JMS,阿里巴巴產(chǎn)品
  • Kafka:類似MQ的產(chǎn)品;分布式消息系統(tǒng),高吞吐量

RabbitMQ

RabbitMQ 中的相關概念:

  • Broker:接收和分發(fā)消息的應用,RabbitMQ Server就是 Message Broker
  • Virtual host:出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網(wǎng)絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創(chuàng)建 exchange/queue 等
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection的開銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創(chuàng)建單獨的 channel 進行通訊,AMQP method 包含了channel id 幫助客戶端和 message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
  • Exchange:message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
  • Queue:消息最終被送到這里等待 consumer 取走
  • Binding:exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)

RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊列協(xié)議)協(xié)議實現(xiàn)的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應用非常廣泛。

RabbitMQ官方地址:http://www.rabbitmq.com/

RabbitMQ提供了6種模式:簡單模式,work模式,Publish/Subscribe發(fā)布與訂閱模式,Routing路由模式,Topics主題模式,RPC遠程調(diào)用模式(遠程調(diào)用,不太算MQ;暫不作介紹);

官網(wǎng)對應模式介紹:https://www.rabbitmq.com/getstarted.html

RabbitMQ入門

pom.xml文件中添加如下依賴:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
編寫生產(chǎn)者
public class Producer {static final String QUEUE_NAME = "simple_queue";public static void main(String[] args) throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//主機地址;默認為 localhostconnectionFactory.setHost("localhost");//連接端口;默認為 5672connectionFactory.setPort(5672);//虛擬主機名稱;默認為 /connectionFactory.setVirtualHost("/wanyuan");//連接用戶名;默認為guestconnectionFactory.setUsername("wanyuan");//連接密碼;默認為guestconnectionFactory.setPassword("wanyuan");//創(chuàng)建連接Connection connection = connectionFactory.newConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊列/*** 參數(shù)1:隊列名稱* 參數(shù)2:是否定義持久化隊列* 參數(shù)3:是否獨占本次連接* 參數(shù)4:是否在不使用的時候自動刪除隊列* 參數(shù)5:隊列其它參數(shù)*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 要發(fā)送的信息String message = "你好;趙麗穎!";/*** 參數(shù)1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊列名稱* 參數(shù)3:消息其它屬性* 參數(shù)4:消息內(nèi)容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 關閉資源channel.close();connection.close();}}

在執(zhí)行上述的消息發(fā)送之后;可以登錄rabbitMQ的管理控制臺,可以發(fā)現(xiàn)隊列和其消息

編寫消費者

抽取創(chuàng)建connection的工具類

public class ConnectionUtil {public static Connection getConnection() throws Exception {//創(chuàng)建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//主機地址;默認為 localhostconnectionFactory.setHost("localhost");//連接端口;默認為 5672connectionFactory.setPort(5672);//虛擬主機名稱;默認為 /connectionFactory.setVirtualHost("/itcast");//連接用戶名;默認為guestconnectionFactory.setUsername("heima");//連接密碼;默認為guestconnectionFactory.setPassword("heima");//創(chuàng)建連接return connectionFactory.newConnection();} }

編寫消息的消費者

public class Consumer {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊列/*** 參數(shù)1:隊列名稱* 參數(shù)2:是否定義持久化隊列* 參數(shù)3:是否獨占本次連接* 參數(shù)4:是否在不使用的時候自動刪除隊列* 參數(shù)5:隊列其它參數(shù)*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//創(chuàng)建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊列名稱* 參數(shù)2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.QUEUE_NAME, true, consumer);//不關閉資源,應該一直監(jiān)聽消息//channel.close();//connection.close();}}

總結(jié)

上述的入門案例中中其實使用的是如下的簡單模式:

在上圖的模型中,有以下概念:

  • P:生產(chǎn)者,也就是要發(fā)送消息的程序
  • C:消費者:消息的接受者,會一直等待消息到來。
  • queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費者從其中取出消息。

總結(jié)

以上是生活随笔為你收集整理的【消息中间件】浅谈中间件优缺RabbitMQ基本使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。