JMS学习四(ActiveMQ消息过滤)
一、消息的選擇器
不管是在消息發送端設置消息過期時間還是在接收端設置等待時間,都是對不滿足的消息有過濾的作用,那消息選擇器就是為過濾消息而生的下面來看看消息選擇器:
ActiveMQ提供了一種機制,使用它,消息服務可根據消息選擇器中的標準來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基于這些屬性的選擇標準來表明對消息是否感興趣。這就簡化了客戶端的工作,并避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標準的消息服務增加了一些額外開銷。 消息選擇器是用于MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),并確定是否將實際消費該消息。消息選擇器是一些字符串,它們基于某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創建的一部分。
?
?消息選擇器的用法
? ? ? MessageConsumer是一個Session創建的對象,用來從Destination接收消息
? ? ? 關于消息選擇器
? ? ? MessageConsumer createConsumer( Destination destination, String messageSelector )
? ? ? MessageConsumer createConsumer( Destination destination, String messageSelector, boolean noLocal )
? ? ? 其中,messageSelector為消息選擇器;?
? ? ? noLocal標志默認為false,當設置為true時,限制消費者只能接收和自己相同的連接(Connection)所發布的消息,此標志只適用于主題,不適用于隊列。
? ? ? public final String SELECTOR="JMS_TYPE='MY_TAG1'" ;?
? ? ? 選擇器檢查傳入消息的JMS_TYPE的屬性,并確定這個屬性的值是否等于MY_TAG1;
? ? ? 如果相等,消息報消費;如果不相等,那么消息就會被忽略;
?
?
1、消息生產者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageProducer messageProducer; public Producer() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.messageProducer = this.session.createProducer(null); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } public void send1(/* String QueueName, Message message */) { try { Destination destination = this.session.createQueue("first"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "張三"); msg1.setInt("age", 20); // 設置用于消息過濾器的條件 msg1.setStringProperty("name", "張三"); msg1.setIntProperty("age", 20); msg1.setStringProperty("color", "bule"); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "李四"); msg2.setInt("age", 25); // 設置用于消息過濾器的條件 msg2.setStringProperty("name", "李四"); msg2.setIntProperty("age", 25); msg2.setStringProperty("color", "white"); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "趙六"); msg3.setInt("age", 30); // 設置用于消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black"); // 發送消息 this.messageProducer.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public void send2() { try { Destination destination = this.session.createQueue("first"); TextMessage message = this.session.createTextMessage("我是一個字符串"); message.setIntProperty("age", 25); // 發送消息 this.messageProducer.send(destination, message, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Producer producer = new Producer(); producer.send1(); // producer.send2(); } }?
2、消息消費者:
package mqtest3; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Conmuser { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageConsumer messageConsumer; // 5、目的地址 private Destination destination; // 消息選擇器 public final String SELECTOR_1 = "age > 25"; public final String SELECTOR_2 = " age > 20 and color='black'"; public Conmuser() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); // 在構造消費者的時候,指定了 消息選擇器 // 有選擇性的消費消息 this.messageConsumer = this.session.createConsumer(destination, SELECTOR_1); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } // 用于監聽消息隊列的消息 class MyLister implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage ret = (TextMessage) message; System.out.println("results;" + ret.getText()); } if (message instanceof MapMessage) { MapMessage ret = (MapMessage) message; System.out.println(ret.toString()); System.out.println(ret.getString("name")); System.out.println(ret.getInt("age")); } } catch (JMSException e) { throw new RuntimeException(e); } } } // 用于異步監聽消息 public void receiver() { try { this.messageConsumer.setMessageListener(new MyLister()); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Conmuser conmuser = new Conmuser(); conmuser.receiver(); } }上面的demo是對MapMessage和TextMessage兩種消息的過濾條件的設置和消費,過濾條件的設置使在消息的屬性中設置,而消費消息的時候直接是在session創建MessageConsumer時傳入的參數即過濾條件(過濾條件的寫法和SQL的寫法是很像的)
?
在寫過濾條件的時候要注意設置的是什么類型的條件即: int 、string 如果是int 則加引號而如果是String則要加哦!!!
?
需要注意的地方
? ? ?注意消息過濾器的過濾條件的設置
// 設置用于消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black");?
消息過濾器的寫法(類似于SQL語句的寫法)
// 消息選擇器 public final String SELECTOR_1 = "age > 20"; public final String SELECTOR_2 = " age > 20 and color='bule'";總結
以上是生活随笔為你收集整理的JMS学习四(ActiveMQ消息过滤)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MPU6050开发 -- 初识
- 下一篇: 互联网晚报 | 3月28日 星期一 |