java rabbitmq 工具类_RabbitMq通用管理工具类
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
import org.apache.log4j.Logger;
/**
* RabbitMq通用管理工具類
* RabbitMQ是AMQP(高級(jí)消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)
* 1.單發(fā)送單(多)接收模式;
* 2.fanout發(fā)布訂閱模式(fanout);
* 3.routing路由模式(direct);
* 4.topic通配符模式(topic)
* Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體;
Channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù);
Exchange:交換機(jī),決定了消息路由規(guī)則,它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列;
Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列;
Binding:綁定了Queue和Exchange,意即為符合什么樣路由規(guī)則的消息,將會(huì)放置入哪一個(gè)消息隊(duì)列;
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞;
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離;
producer:消息生產(chǎn)者,就是投遞消息的程序;
consumer:消息消費(fèi)者,就是接受消息的程序;
* 消息隊(duì)列持久化包括3個(gè)部分:
(1)exchange持久化,在聲明時(shí)指定durable => 1
(2)queue持久化,在聲明時(shí)指定durable => 1
(3)消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。
* 1.將交換機(jī)置為可持久;
2.將通道置為可持久;
3.消息發(fā)送時(shí)設(shè)置可持久;
當(dāng)我們"生產(chǎn)"了一條可持久化的消息,嘗試中斷MQ服務(wù),啟動(dòng)消費(fèi)者獲取消息,消息依然能夠恢復(fù)。相反,則拋出異常;
* 消息隊(duì)列的使用過程大概如下:
(1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel;
(2)客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性;
(3)客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性;
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系;
(5)客戶端投遞消息到exchange, exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里
* exchange也有幾個(gè)類型,
完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為"abc",那么客戶端提交的消息,只有設(shè)置了key為"abc"的才會(huì)投遞到隊(duì)列。
對(duì)key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號(hào)"#"匹配一個(gè)或多個(gè)詞,符號(hào)"*"匹配正好一個(gè)詞。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。
還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。
* Exchange
Durability 持久性,這是exchange的可選屬性,如果你Durability設(shè)置為false,那些當(dāng)前會(huì)話結(jié)束的時(shí)候,該exchange也會(huì)被銷毀;
Auto delete 當(dāng)沒有隊(duì)列或者其他exchange綁定到此exchange的時(shí)候,該exchange被銷毀;
Internal 表示這個(gè)exchange不可以被client用來推送消息,僅用來進(jìn)行exchange和exchange之間的綁定;
無法聲明2個(gè)名稱相同 但是類型卻不同的exchange;
* Queue
Durability 和exchange相同,未持久化的隊(duì)列,服務(wù)重啟后銷毀;
Auto delete 當(dāng)沒有消費(fèi)者連接到該隊(duì)列的時(shí)候,隊(duì)列自動(dòng)銷毀;
Exclusive 使隊(duì)列成為私有隊(duì)列,只有當(dāng)前應(yīng)用程序可用,當(dāng)你需要限制隊(duì)列只有一個(gè)消費(fèi)者,這是很有用的;
擴(kuò)展屬性如下對(duì)應(yīng)源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary)最后的參數(shù),
Message TTL 當(dāng)一個(gè)消息被推送在該隊(duì)列的時(shí)候 可以存在的時(shí)間 單位為ms,(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-message-ttl" );
Auto expire 在隊(duì)列自動(dòng)刪除之前可以保留多長時(shí)間(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-expires");
Max length 一個(gè)隊(duì)列可以容納的已準(zhǔn)備消息的數(shù)量(對(duì)應(yīng)擴(kuò)展參數(shù)argument "x-max-length");
一旦創(chuàng)建了隊(duì)列和交換機(jī),就不能修改其標(biāo)志了。例如,如果創(chuàng)建了一個(gè)non-durable的隊(duì)列,然后想把它改變成durable的,唯一的辦法就是刪除這個(gè)隊(duì)列然后重現(xiàn)創(chuàng)建;
* RabbitMQ消息模型的核心理念是:發(fā)布者(producer)不會(huì)直接發(fā)送任何消息給隊(duì)列。
* 事實(shí)上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊(duì)列。
* 發(fā)布者(producer)只需要把消息發(fā)送給一個(gè)exchange。
* exchange非常簡單,它一邊從發(fā)布者方接收消息,一邊把消息推入隊(duì)列。
* exchange必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊(duì)列還是是多個(gè)隊(duì)列,或者是直接忽略消息。
* 這些規(guī)則是通過exchange type來定義的;
* @author
*/
public class RabbitUtil1 {
/**日志對(duì)象**/
private final static Logger log = Logger.getLogger(RabbitUtil1.class);
/**RabbitMq連接工廠對(duì)象**/
private volatile static ConnectionFactory factory = null;
public static boolean stopRabbitFlag=false;
/**構(gòu)造方法**/
public RabbitUtil1() {
this(ConnectionFactory.DEFAULT_HOST,
ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST,
ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);
}
/**
* 構(gòu)造方法
* @param serverHost Rabbit服務(wù)主機(jī)
*/
public RabbitUtil1(String serverHost) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST,
ConnectionFactory.DEFAULT_USER, ConnectionFactory.DEFAULT_PASS);
}
/**
* 構(gòu)造方法
* @param serverHost Rabbit服務(wù)主機(jī)
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, String username, String password) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT,
ConnectionFactory.DEFAULT_VHOST, username, password);
}
/**
* 構(gòu)造方法
* @param serverHost Rabbit服務(wù)主機(jī)
* @param vhost 虛擬host(類似權(quán)限組)
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, String vhost, String username,
String password) {
this(serverHost, ConnectionFactory.DEFAULT_AMQP_PORT, vhost, username,
password);
}
/**
* 構(gòu)造方法
* @param serverHost Rabbit服務(wù)主機(jī)
* @param port Rabbit端口
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, int port, String username,
String password) {
this(serverHost, port, ConnectionFactory.DEFAULT_VHOST, username,
password);
}
/**
* 構(gòu)造方法(初始化單例RabbitConnectionFactory)
* @param serverHost Rabbit服務(wù)主機(jī)
* @param port Rabbit端口
* @param vhost 虛擬host(類似權(quán)限組)
* @param username 用戶名
* @param password 密碼
*/
public RabbitUtil1(String serverHost, int port, String vhost,
String username, String password) {
if (null == factory) {
synchronized (ConnectionFactory.class) {
if (null == factory) {
factory = new ConnectionFactory();
factory.setHost(serverHost);
factory.setPort(port);
factory.setVirtualHost(vhost);
factory.setUsername(username);
factory.setPassword(password);
log.info(">>>>>>Singleton ConnectionFactory Create Success>>>>>>");
}
}
}
if(stopRabbitFlag){
stopRabbitFlag=false;
}
}
/**
* 創(chuàng)建連接
* @return 連接
* @throws Exception
*/
private Connection buildConnection() throws Exception {
return factory.newConnection();
}
/**
* 創(chuàng)建信道
* @param connection 連接
* @return 信道
* @throws Exception 運(yùn)行時(shí)異常
*/
private Channel buildChannel(Connection connection) throws Exception {
return connection.createChannel();
}
/**
* 關(guān)閉連接和信道
* @param connection rabbitmq連接
* @param channel rabbitmq信道
*/
private void close(Connection connection, Channel channel) {
try {
if (null != channel) {
channel.close();
}
if (null != connection) {
connection.close();
}
} catch (Exception e) {
log.error(">>>>>>關(guān)閉RabbitMq的connection或channel發(fā)生異常>>>>>>", e);
}
}
/**
* 發(fā)送direct類型消息
* @param exchangeName exchange名稱
* @param routingKey 路由key字符串
* @param message 待發(fā)送的消息
* @throws Exception 運(yùn)行時(shí)異常
*/
public void sendDirect(String exchangeName, String routingKey, String message) throws Exception {
Connection connection = null;
Channel channel = null;
try {
connection = buildConnection();
channel = buildChannel(connection);
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
log.info("消息(" + message + "發(fā)布成功");
} finally {
close(connection, channel);
}
}
/**
* 接收direct類型消息(手動(dòng)發(fā)送ack)
* @param queueName 隊(duì)列名稱
* @param processer 回調(diào)處理接口
* @throws Exception 運(yùn)行時(shí)異常
*/
public void receiveDirect(String queueName, RabbitCallback processer) throws Exception {
receiveDirect(queueName, false, processer);
}
/**
* 接收direct類型消息
* @param queueName 隊(duì)列名稱
* @param autoAck 是否自動(dòng)發(fā)送ack true-是 false-否
* @param processer 回調(diào)處理接口
* @throws Exception 運(yùn)行時(shí)異常
*/
public void receiveDirect(String queueName, boolean autoAck, RabbitCallback processer)
throws Exception {
basicConsume(queueName, autoAck, processer);
}
/**
* 循環(huán)獲取消息并處理
* @param queueName 隊(duì)列名稱
* @param autoAck 是否自動(dòng)發(fā)送ack true-是 false-否
* @param processer 回調(diào)處理接口
* @throws Exception 運(yùn)行時(shí)異常
*/
private void basicConsume(String queueName, final boolean autoAck, final RabbitCallback processer)
throws Exception {
final Channel channel = buildChannel(buildConnection());
/**channel.basicQos(1)
* 保證接收端僅在發(fā)送了ack之后才會(huì)接收下一個(gè)消息,
* 在這種情況下發(fā)送端會(huì)嘗試把消息發(fā)送給下一個(gè)接收端
*/
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String responseMsg = new String(body, "UTF-8");
System.out.println("Consumer===");
if (stopRabbitFlag){
if(channel!=null){
try {
channel.basicNack(deliveryTag, false, true);//回復(fù)處理失敗,客戶需要關(guān)閉
channel.close();//關(guān)閉客戶端
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}else {//若服務(wù)停止標(biāo)志為false,需要處理業(yè)務(wù),調(diào)用回調(diào)函數(shù)處理
boolean success = processer.process(responseMsg);
if (!autoAck) {
if (success) {
channel.basicAck(deliveryTag, false);
} else {
channel.basicNack(deliveryTag, false, true);
}
}
}
}
};
channel.basicConsume(queueName, autoAck, consumer);
}
}
總結(jié)
以上是生活随笔為你收集整理的java rabbitmq 工具类_RabbitMq通用管理工具类的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java课程设计 成绩_java课程设计
- 下一篇: java treeset 红黑树_【数据