rabbitMQ教程 spring整合rabbitMQ代码实例
生活随笔
收集整理的這篇文章主要介紹了
rabbitMQ教程 spring整合rabbitMQ代码实例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉載自?https://www.cnblogs.com/tohxyblog/p/7256554.html
一、開啟rabbitMQ服務,導入MQ jar包和gson jar包(MQ默認的是jackson,但是效率不如Gson,所以我們用gson)
?
?二、發送端配置,在spring配置文件中配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"><!-- 連接服務配置 如果MQ服務器在遠程服務器上,請新建用戶用新建的用戶名密碼 guest默認不允許遠程登錄--> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <!-- 配置愛admin,自動根據配置文件生成交換器和隊列,無需手動配置 --><rabbit:admin connection-factory="connectionFactory" /> <!-- queue 隊列聲明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag" /> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列,由于Gson的速度快于jackson,這里替換為Gson的一個實現 --> <bean id="jsonMessageConverter" class="sendMQ.Gson2JsonMessageConverter" /> <!-- spring template聲明 --> <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />發送端代碼:GSON配置
package sendMQ;import java.io.IOException; import java.io.UnsupportedEncodingException;import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException;import com.google.gson.Gson;public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(),messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }發送類接口:
public interface MQProducer {/*** 發送消息到指定隊列* @param queueKey* @param object*/public void sendDataToQueue(String queueKey, Object object); }實現類:test是測試用的。
package sendMQ;import java.util.HashMap; import java.util.Map;import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(value = SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:/spring-common.xml"})@Component public class MQProducerImpl implements MQProducer {@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void sendDataToQueue(String queueKey, Object object) {System.out.println("--"+amqpTemplate);try {amqpTemplate.convertAndSend(object);System.out.println("------------消息發送成功");} catch (Exception e) {System.out.println(e);}}@Testpublic void test() { Map<String,Object> msg = new HashMap<>();msg.put("data","hello,456");while(true){amqpTemplate.convertAndSend(msg); try {Thread.sleep(2000);} catch (InterruptedException e) {// TODO 自動生成的 catch 塊e.printStackTrace();}}} }?
?
?
?
?
接收端配置:
<!-- 連接服務配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 隊列聲明--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/> <!-- exchange queue binging key 綁定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiveMessageListener" class="receiveMQ.QueueListenter" /> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" /> </rabbit:listener-container>接收端代碼:
package receiveMQ;import java.io.UnsupportedEncodingException;import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener;public class QueueListenter implements MessageListener{@Overridepublic void onMessage(Message msg) {try {System.out.print("-------------------"+new String(msg.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {// TODO 自動生成的 catch 塊e.printStackTrace();}}}接收端測試啟動:
package receiveMQ;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ConsumerMain {public static void main(String[] args) { new ClassPathXmlApplicationContext("spring-common.xml"); } }上面代碼均有注釋,應該不難看懂,復制即可使用,實現了MQ的簡單功能。
說明:可以配置多個接收端,spring默認的是負載均衡機制,每個接收端接收一條的來,這些擴展功能待后面有時間再講解
總結
以上是生活随笔為你收集整理的rabbitMQ教程 spring整合rabbitMQ代码实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot——RabbitM
- 下一篇: memcached和redis的区别