日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费

發布時間:2025/3/20 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

摘選:https://my.oschina.net/u/3613230/blog/1457227

摘要: 最近在項目開發中,需要用到activemq,用的時候,發現在同一個項目中point-to-point模式中,配置多個隊列,消息生成者只能往一個隊列中發消息或者往多個隊列發送相同消息,并且監聽器只能監聽一個隊列,這樣配置多個隊列也沒有意義,作者想要實現的是:配置多個隊列,并且生產者可以往多個隊列中發送不同的消息,監聽器消費時,可以判斷根據不同的隊列進行相應的業務處理,網上搜了一個,發現都是單個隊列和監聽,研究了一下,發現是可以實現的,廢話不多說,直接上代碼:

項目結構截圖

maven所需依賴:

1

2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

3 4.0.0

4 com.gxf

5 springmq

6 war

7 0.0.1-SNAPSHOT

8 springmq Maven Webapp

9 http://maven.apache.org

10

11

12 4.1.8.RELEASE

13 3.1.0

14

15

16

17

18

19 junit

20 junit

21 4.10

22 test

23

24

25

26 jstl

27 jstl

28 1.2

29

30

31

32 javax.servlet

33 javax.servlet-api

34 ${javax.servlet}

35

36

37

38

39 org.springframework

40 spring-core

41 ${springframework}

42

43

44 org.springframework

45 spring-context

46 ${springframework}

47

48

49 org.springframework

50 spring-tx

51 ${springframework}

52

53

54 org.springframework

55 spring-webmvc

56 ${springframework}

57

58

59 org.springframework

60 spring-jms

61 ${springframework}

62

63

64

65 org.apache.xbean

66 xbean-spring

67 3.16

68

69

70

71

72 org.apache.activemq

73 activemq-core

74 5.7.0

75

76

77 org.apache.activemq

78 activemq-pool

79 5.14.3

80

81

82

83

84

85 springmq

86

87

-activemq配置文件:activemq.xml

1 <?xml version="1.0" encoding="UTF-8"?>

2

3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

4 xmlns:amq="http://activemq.apache.org/schema/core"

5 xmlns:jms="http://www.springframework.org/schema/jms"

6 xmlns:context="http://www.springframework.org/schema/context"

7 xmlns:mvc="http://www.springframework.org/schema/mvc"

8 xsi:schemaLocation="

9 http://www.springframework.org/schema/beans

10 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd

11 http://www.springframework.org/schema/context

12 http://www.springframework.org/schema/context/spring-context-4.1.xsd

13 http://www.springframework.org/schema/mvc

14 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd

15 http://www.springframework.org/schema/jms

16 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd

17 http://activemq.apache.org/schema/core

18 http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd"

19 >

20

21

22

23

24

25

26

27

28 class="org.springframework.jms.connection.CachingConnectionFactory">

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

-springmvc配置文件:springmvc.xml

1 <?xml version="1.0" encoding="UTF-8"?>

2

3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

4 xmlns:context="http://www.springframework.org/schema/context"

5 xmlns:mvc="http://www.springframework.org/schema/mvc"

6 xsi:schemaLocation="http://www.springframework.org/schema/beans

7 http://www.springframework.org/schema/beans/spring-beans.xsd

8 http://www.springframework.org/schema/context

9 http://www.springframework.org/schema/context/spring-context-4.1.xsd

10 http://www.springframework.org/schema/mvc

11 http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">

12

13

14

15

16

17

18 value="org.springframework.web.servlet.view.JstlView" />

19

20

21

22

-Controll層 MainHandler.java代碼:

1 packagecom.gxf.handler;2

3 importjava.text.SimpleDateFormat;4 import java.util.*;5

6 importjavax.annotation.Resource;7 importjavax.jms.Destination;8

9

10 importorg.apache.activemq.command.ActiveMQDestination;11 importorg.springframework.stereotype.Controller;12 importorg.springframework.web.bind.annotation.RequestMapping;13 importorg.springframework.web.bind.annotation.RequestMethod;14 importorg.springframework.web.bind.annotation.RequestParam;15 importorg.springframework.web.servlet.ModelAndView;16

17 importcom.gxf.service.ProducerService;18

19

20 /**

21 *22 *@authorstark201723 *24 */

25 @Controller26 public classMainHandler {27

28

29 //隊列名

30 @Resource(name="queueDestination")31 privateDestination queueDestination;32

33

34 //隊列消息生產者

35 @Resource(name="producerService")36 privateProducerService producerService;37

38

39

40 @RequestMapping(value="/main",method=RequestMethod.GET)41 publicString producer(){42

43 return "main";44 }45 /**

46 * 往隊列queue1中發送消息47 *@parammessage48 *@return

49 */

50 @RequestMapping(value="/sendone",method=RequestMethod.POST)51 public String producer(@RequestParam("message") String message) {52

53 /**

54 * 將destination強制轉換為ActiveMQDestination,在ActiveMQDestination對象中,55 * 通過getCompositeDestinations()方法獲取destination隊列數組:queue://queue1 queue://queue256 *57 */

58 ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;59 /**

60 * 往隊列queue1中發送文本消息61 */

62 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中發送文本消息");63 producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[0], message);64 /**

65 * 往隊列queue1中發送MapMessage消息66 */

67 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[0].getPhysicalName()+"中發送MapMessage消息");68 producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[0], message);69

70 //String bb="fdsalfkasdfkljasd;flkajsfd";71 //byte[] b = bb.getBytes();72

73 //producer.sendBytesMessage(demoQueueDestination, b);74

75 //producer.sendMapMessage(mqQueueDestination, message);

76

77 return "main";78 }79 /**

80 * 往消息隊列queue2中發送消息81 *@parammessage82 *@return

83 */

84 @RequestMapping(value="/sendtwo",method=RequestMethod.POST)85 public String producertwo(@RequestParam("message") String message) {86

87

88 /**

89 * 將destination強制轉換為ActiveMQDestination,在ActiveMQDestination對象中,90 * 通過getCompositeDestinations()方法獲取destination隊列數組:queue://queue1 queue://queue291 *92 */

93 ActiveMQDestination activeMQDestination=(ActiveMQDestination) queueDestination;94 /**

95 * 隊列queue2中發送文本消息96 */

97 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中發送文本消息");98 producerService.sendTxtMessage(activeMQDestination.getCompositeDestinations()[1], message);99 /**

100 * 隊列queue2中發送mapMessage消息101 */

102 System.out.println("往隊列"+activeMQDestination.getCompositeDestinations()[1].getPhysicalName()+"中發送文本消息");103 producerService.sendMapMessage(activeMQDestination.getCompositeDestinations()[1], message);104

105 String bb="fdsalfkasdfkljasd;flkajsfd";106 byte[] b =bb.getBytes();107

108 //producer.sendBytesMessage(demoQueueDestination, b);109

110 //producer.sendMapMessage(mqQueueDestination, message);

111

112 return "main";113 }114

115

116

117

118 }

-生產者ProducerService.java代碼:

1 packagecom.gxf.service;2

3 importjava.io.Serializable;4 importjava.util.List;5 importjava.util.Map;6

7 importjavax.annotation.Resource;8 importjavax.jms.BytesMessage;9 importjavax.jms.Destination;10 importjavax.jms.JMSException;11 importjavax.jms.MapMessage;12 importjavax.jms.Message;13 importjavax.jms.Session;14 importjavax.jms.StreamMessage;15

16 importorg.springframework.jms.core.JmsTemplate;17 importorg.springframework.jms.core.MessageCreator;18 importorg.springframework.stereotype.Service;19

20 @Service21 public classProducerService {22

23 @Resource(name = "jmsTemplate")24 privateJmsTemplate jmsTemplate;25

26 /**

27 * 向指定Destination發送text消息28 *29 *@paramdestination30 *@parammessage31 */

32 public void sendTxtMessage(Destination destination, finalString message) {33 if (null ==destination) {34 destination =jmsTemplate.getDefaultDestination();35 }36 jmsTemplate.send(destination, newMessageCreator() {37 public Message createMessage(Session session) throwsJMSException {38 returnsession.createTextMessage(message);39 }40 });41 System.out.println("springJMS send text message...");42 }43

44 /**

45 * 向指定Destination發送map消息46 *47 *@paramdestination48 *@parammessage49 */

50 public void sendMapMessage(Destination destination, finalString message) {51 if (null ==destination) {52 destination =jmsTemplate.getDefaultDestination();53 }54 jmsTemplate.send(destination, newMessageCreator() {55 public Message createMessage(Session session) throwsJMSException {56 MapMessage mapMessage =session.createMapMessage();57 mapMessage.setString("msgId", message);58 returnmapMessage;59 }60 });61 System.out.println("springJMS send map message...");62 }63

64 /**

65 * 向指定Destination發送序列化的對象66 *67 *@paramdestination68 *@paramobject69 * object 必須序列化70 */

71 public void sendObjectMessage(Destination destination, finalSerializable object) {72 if (null ==destination) {73 destination =jmsTemplate.getDefaultDestination();74 }75 jmsTemplate.send(destination, newMessageCreator() {76 public Message createMessage(Session session) throwsJMSException {77 returnsession.createObjectMessage(object);78 }79 });80 System.out.println("springJMS send object message...");81 }82

83 /**

84 * 向指定Destination發送字節消息85 *86 *@paramdestination87 *@parambytes88 */

89 public void sendBytesMessage(Destination destination, final byte[] bytes) {90 if (null ==destination) {91 destination =jmsTemplate.getDefaultDestination();92 }93 jmsTemplate.send(destination, newMessageCreator() {94 public Message createMessage(Session session) throwsJMSException {95 BytesMessage bytesMessage =session.createBytesMessage();96 bytesMessage.writeBytes(bytes);97 returnbytesMessage;98

99 }100 });101 System.out.println("springJMS send bytes message...");102 }103

104 /**

105 * 向默認隊列發送Stream消息106 */

107 public voidsendStreamMessage(Destination destination) {108 jmsTemplate.send(newMessageCreator() {109 public Message createMessage(Session session) throwsJMSException {110 StreamMessage message =session.createStreamMessage();111 message.writeString("stream string");112 message.writeInt(11111);113 returnmessage;114 }115 });116 System.out.println("springJMS send Strem message...");117 }118

119 }

-隊列消息監聽器QueueMessageListener.java代碼:

1 packagecom.gxf.listener;2 importjavax.jms.BytesMessage;3 importjavax.jms.JMSException;4 importjavax.jms.MapMessage;5 importjavax.jms.Message;6 importjavax.jms.MessageListener;7 importjavax.jms.ObjectMessage;8 importjavax.jms.StreamMessage;9 importjavax.jms.TextMessage;10

11 importorg.apache.activemq.advisory.DestinationEvent;12 importorg.apache.activemq.command.ActiveMQDestination;13 importorg.apache.activemq.command.ActiveMQMessage;14 importorg.apache.activemq.command.DestinationInfo;15

16

17 public class QueueMessageListener implementsMessageListener {18

19

20 //當收到消息后,自動調用該方法

21 @Override22 public voidonMessage(Message message) {23 try{24 ActiveMQDestination queues=(ActiveMQDestination)message.getJMSDestination();25

26 /**

27 * 監聽消息隊列queue1中的消息28 */

29 if(queues.getPhysicalName().equalsIgnoreCase("queue1"))30 {31 System.out.println("監聽隊列:"+queues.getPhysicalName()+"消費了消息:");32 //如果是文本消息

33 if (message instanceofTextMessage) {34 TextMessage tm =(TextMessage) message;35 try{36 System.out.println("from get textMessage:\t" +tm.getText());37 } catch(JMSException e) {38 //TODO Auto-generated catch block

39 e.printStackTrace();40 }41 }42

43 //如果是Map消息

44 if (message instanceofMapMessage) {45 MapMessage mm =(MapMessage) message;46 try{47 System.out.println("from get MapMessage:\t" + mm.getString("msgId"));48 } catch(JMSException e) {49 //TODO Auto-generated catch block

50 e.printStackTrace();51 }52 }53 }54 /**

55 * 監聽消息隊列queue2中的消息56 */

57 if(queues.getPhysicalName().equalsIgnoreCase("queue2"))58 {59 System.out.println("監聽隊列:"+queues.getPhysicalName()+"消費了消息:");60 //如果是文本消息

61 if (message instanceofTextMessage) {62 TextMessage tm =(TextMessage) message;63 try{64 System.out.println("from get textMessage:\t" +tm.getText());65 } catch(JMSException e) {66 //TODO Auto-generated catch block

67 e.printStackTrace();68 }69 }70

71 //如果是Map消息

72 if (message instanceofMapMessage) {73 MapMessage mm =(MapMessage) message;74 try{75 System.out.println("from get MapMessage:\t" + mm.getString("msgId"));76 } catch(JMSException e) {77 //TODO Auto-generated catch block

78 e.printStackTrace();79 }80 }81 }82

83 } catch(JMSException e1) {84 //TODO Auto-generated catch block

85 e1.printStackTrace();86 }87

88

89 //如果是Object消息

90 if (message instanceofObjectMessage) {91 ObjectMessage om =(ObjectMessage) message;92 System.out.println("from get ObjectMessage:\t");93 }94

95 //如果是bytes消息

96 if (message instanceofBytesMessage) {97 System.out.println("from get BytesMessage:\t");98 byte[] b = new byte[1024];99 int len = -1;100 BytesMessage bm =(BytesMessage) message;101 try{102 while ((len = bm.readBytes(b)) != -1) {103 System.out.println(new String(b, 0, len));104 }105 } catch(JMSException e) {106 //TODO Auto-generated catch block

107 e.printStackTrace();108 }109 }110

111 //如果是Stream消息

112 if (message instanceofStreamMessage) {113 System.out.println("from get BytesMessage:\t");114 StreamMessage sm =(StreamMessage) message;115 try{116 System.out.println(sm.readString());117 System.out.println(sm.readInt());118 } catch(JMSException e) {119 //TODO Auto-generated catch block

120 e.printStackTrace();121 }122

123 }}124

125 }

-啟動項目訪問main,進行消息發送:

后臺打印往不同隊列發送的消息和監聽到不同隊列中的消息:

隊列queue1發送消費了14條消息,queue2發送消費了10條消息:

到此想要的功能需求已實現

總結

以上是生活随笔為你收集整理的java 监听队列_spring+activemq实战之配置监听多队列实现不同队列消息消费的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。