日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java rabbitmq 工具类_RabbitMq通用管理工具类

發(fā)布時(shí)間:2025/3/20 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java rabbitmq 工具类_RabbitMq通用管理工具类 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

import org.apache.log4j.Logger;

/**

* RabbitMq通用管理工具類

* RabbitMQ是AMQP(高級(jí)消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)

* 1.單發(fā)送單(多)接收模式;

* 2.fanout發(fā)布訂閱模式(fanout);

* 3.routing路由模式(direct);

* 4.topic通配符模式(topic)

* Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體;

Channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù);

Exchange:交換機(jī),決定了消息路由規(guī)則,它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列;

Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列;

Binding:綁定了Queue和Exchange,意即為符合什么樣路由規(guī)則的消息,將會(huì)放置入哪一個(gè)消息隊(duì)列;

Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞;

vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離;

producer:消息生產(chǎn)者,就是投遞消息的程序;

consumer:消息消費(fèi)者,就是接受消息的程序;

* 消息隊(duì)列持久化包括3個(gè)部分:

(1)exchange持久化,在聲明時(shí)指定durable => 1

(2)queue持久化,在聲明時(shí)指定durable => 1

(3)消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。

* 1.將交換機(jī)置為可持久;

2.將通道置為可持久;

3.消息發(fā)送時(shí)設(shè)置可持久;

當(dāng)我們"生產(chǎn)"了一條可持久化的消息,嘗試中斷MQ服務(wù),啟動(dòng)消費(fèi)者獲取消息,消息依然能夠恢復(fù)。相反,則拋出異常;

* 消息隊(duì)列的使用過程大概如下:

(1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel;

(2)客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性;

(3)客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性;

(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系;

(5)客戶端投遞消息到exchange, exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里

* exchange也有幾個(gè)類型,

完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為"abc",那么客戶端提交的消息,只有設(shè)置了key為"abc"的才會(huì)投遞到隊(duì)列。

對(duì)key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號(hào)"#"匹配一個(gè)或多個(gè)詞,符號(hào)"*"匹配正好一個(gè)詞。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。

還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。

* Exchange

Durability 持久性,這是exchange的可選屬性,如果你Durability設(shè)置為false,那些當(dāng)前會(huì)話結(jié)束的時(shí)候,該exchange也會(huì)被銷毀;

Auto delete 當(dāng)沒有隊(duì)列或者其他exchange綁定到此exchange的時(shí)候,該exchange被銷毀;

Internal 表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定;

無法聲明2個(gè)名稱相同 但是類型卻不同的exchange;

* Queue

Durability 和exchange相同,未持久化的隊(duì)列,服務(wù)重啟后銷毀;

Auto delete 當(dāng)沒有消費(fèi)者連接到該隊(duì)列的時(shí)候,隊(duì)列自動(dòng)銷毀;

Exclusive 使隊(duì)列成為私有隊(duì)列,只有當(dāng)前應(yīng)用程序可用,當(dāng)你需要限制隊(duì)列只有一個(gè)消費(fèi)者,這是很有用的;

擴(kuò)展屬性如下對(duì)應(yīng)源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary)最后的參數(shù),

Message TTL 當(dāng)一個(gè)消息被推送在該隊(duì)列的時(shí)候 可以存在的時(shí)間 單位為ms,(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-message-ttl" );

Auto expire 在隊(duì)列自動(dòng)刪除之前可以保留多長時(shí)間(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-expires");

Max length 一個(gè)隊(duì)列可以容納的已準(zhǔn)備消息的數(shù)量(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-max-length");

一旦創(chuàng)建了隊(duì)列和交換機(jī),就不能修改其標(biāo)志了。例如,如果創(chuàng)建了一個(gè)non-durable的隊(duì)列,然后想把它改變成durable的,唯一的辦法就是刪除這個(gè)隊(duì)列然后重現(xiàn)創(chuàng)建;

* RabbitMQ消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。

* 事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。

* 發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)exchange。

* exchange非常簡單,它一邊從發(fā)布者方接收消息,一邊把消息推入隊(duì)列。

* exchange必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。

* 這些規(guī)則是通過exchange type來定義的;

* @author

*/

public class RabbitUtil1 {

/**日志對(duì)象**/

private final static Logger log = Logger.getLogger(RabbitUtil1.class);

/**RabbitMq連接工廠對(duì)象**/

private volatile static ConnectionFactory factory = null;

public static boolean stopRabbitFlag=false;

/**構(gòu)造方法**/

public RabbitUtil1() {

this(ConnectionFactory.DEFAULT_HOST,

ConnectionFactory.DEFAULT_AMQP_PORT,

ConnectionFactory.DEFAULT_VHOST,

ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);

}

/**

* 構(gòu)造方法

* @param serverHost Rabbit服務(wù)主機(jī)

*/

public RabbitUtil1(String serverHost) {

this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,

ConnectionFactory.DEFAULT_VHOST,

ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);

}

/**

* 構(gòu)造方法

* @param serverHost Rabbit服務(wù)主機(jī)

* @param username 用戶名

* @param password 密碼

*/

public RabbitUtil1(String serverHost, String username, String password) {

this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,

ConnectionFactory.DEFAULT_VHOST, username, password);

}

/**

* 構(gòu)造方法

* @param serverHost Rabbit服務(wù)主機(jī)

* @param vhost 虛擬host(類似權(quán)限組)

* @param username 用戶名

* @param password 密碼

*/

public RabbitUtil1(String serverHost, String vhost, String username,

String password) {

this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT, vhost, username,

password);

}

/**

* 構(gòu)造方法

* @param serverHost Rabbit服務(wù)主機(jī)

* @param port Rabbit端口

* @param username 用戶名

* @param password 密碼

*/

public RabbitUtil1(String serverHost, int port, String username,

String password) {

this(serverHost, port, ConnectionFactory.DEFAULT_VHOST, username,

password);

}

/**

* 構(gòu)造方法(初始化單例RabbitConnectionFactory)

* @param serverHost Rabbit服務(wù)主機(jī)

* @param port Rabbit端口

* @param vhost 虛擬host(類似權(quán)限組)

* @param username 用戶名

* @param password 密碼

*/

public RabbitUtil1(String serverHost, int port, String vhost,

String username, String password) {

if (null == factory) {

synchronized (ConnectionFactory.class) {

if (null == factory) {

factory = new ConnectionFactory();

factory.setHost(serverHost);

factory.setPort(port);

factory.setVirtualHost(vhost);

factory.setUsername(username);

factory.setPassword(password);

log.info(">>>>>>Singleton ConnectionFactory Create Success>>>>>>");

}

}

}

if(stopRabbitFlag){

stopRabbitFlag=false;

}

}

/**

* 創(chuàng)建連接

* @return 連接

* @throws Exception

*/

private Connection buildConnection() throws Exception {

return factory.newConnection();

}

/**

* 創(chuàng)建信道

* @param connection 連接

* @return 信道

* @throws Exception 運(yùn)行時(shí)異常

*/

private Channel buildChannel(Connection connection) throws Exception {

return connection.createChannel();

}

/**

* 關(guān)閉連接和信道

* @param connection rabbitmq連接

* @param channel rabbitmq信道

*/

private void close(Connection connection, Channel channel) {

try {

if (null != channel) {

channel.close();

}

if (null != connection) {

connection.close();

}

} catch (Exception e) {

log.error(">>>>>>關(guān)閉RabbitMq的connection或channel發(fā)生異常>>>>>>", e);

}

}

/**

* 發(fā)送direct類型消息

* @param exchangeName exchange名稱

* @param routingKey 路由key字符串

* @param message 待發(fā)送的消息

* @throws Exception 運(yùn)行時(shí)異常

*/

public void sendDirect(String exchangeName, String routingKey, String message) throws Exception {

Connection connection = null;

Channel channel = null;

try {

connection = buildConnection();

channel = buildChannel(connection);

channel.basicPublish(exchangeName, routingKey,

MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

log.info("消息(" + message + "發(fā)布成功");

} finally {

close(connection, channel);

}

}

/**

* 接收direct類型消息(手動(dòng)發(fā)送ack)

* @param queueName 隊(duì)列名稱

* @param processer 回調(diào)處理接口

* @throws Exception 運(yùn)行時(shí)異常

*/

public void receiveDirect(String queueName, RabbitCallback processer) throws Exception {

receiveDirect(queueName, false, processer);

}

/**

* 接收direct類型消息

* @param queueName 隊(duì)列名稱

* @param autoAck 是否自動(dòng)發(fā)送ack true-是 false-否

* @param processer 回調(diào)處理接口

* @throws Exception 運(yùn)行時(shí)異常

*/

public void receiveDirect(String queueName, boolean autoAck, RabbitCallback processer)

throws Exception {

basicConsume(queueName, autoAck, processer);

}

/**

* 循環(huán)獲取消息并處理

* @param queueName 隊(duì)列名稱

* @param autoAck 是否自動(dòng)發(fā)送ack true-是 false-否

* @param processer 回調(diào)處理接口

* @throws Exception 運(yùn)行時(shí)異常

*/

private void basicConsume(String queueName, final boolean autoAck, final RabbitCallback processer)

throws Exception {

final Channel channel = buildChannel(buildConnection());

/**channel.basicQos(1)

* 保證接收端僅在發(fā)送了ack之后才會(huì)接收下一個(gè)消息,

* 在這種情況下發(fā)送端會(huì)嘗試把消息發(fā)送給下一個(gè)接收端

*/

channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

long deliveryTag = envelope.getDeliveryTag();

String responseMsg = new String(body, "UTF-8");

System.out.println("Consumer===");

if (stopRabbitFlag){

if(channel!=null){

try {

channel.basicNack(deliveryTag, false, true);//回復(fù)處理失敗,客戶需要關(guān)閉

channel.close();//關(guān)閉客戶端

} catch (TimeoutException e) {

e.printStackTrace();

}

}

}else {//若服務(wù)停止標(biāo)志為false,需要處理業(yè)務(wù),調(diào)用回調(diào)函數(shù)處理

boolean success = processer.process(responseMsg);

if (!autoAck) {

if (success) {

channel.basicAck(deliveryTag, false);

} else {

channel.basicNack(deliveryTag, false, true);

}

}

}

}

};

channel.basicConsume(queueName, autoAck, consumer);

}

}

總結(jié)

以上是生活随笔為你收集整理的java rabbitmq 工具类_RabbitMq通用管理工具类的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。