日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列

發布時間:2024/4/30 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

QuartZ定時任務+RabbitMQ消息隊列

一 、QuartZ定時任務解決訂單系統遺留問題

  • 情景分析:
  • 在電商項目中 , 訂單生成后 , 數據庫商品數量-1 , 但是用戶遲遲不進行支付操作 , 這樣不僅導致沒有生成效益, 而且導致后來的用戶無法購買的問題 。 這種情況的訂單成為惡意訂單
  • 解決方案:
  • 早期: 虛擬商品數量 , 數量減到0時 ,仍然可以減。 但是這種辦法顯然沒有根本的解決問題
  • 現在引入定時任務的概念 ,超時未支付 , 則訂單失效, 商品回庫 。 如:12306一般為15分鐘 , 天貓是1天 。
  • 解決:
  • 可以使用java中自帶的api–Timer , 但是操作比較繁瑣 。
  • 電商項目中引入第三方插件:石英鐘quartZ
  • 定時任務邏輯: 設置24小時支付超時
  • 如何判斷一個訂單是否超時?
  • 訂單中有status(訂單狀態 1未付款 2已付款 3未發貨4已發貨5交易成功 6交易失敗)字段和createTime(訂單生成時間)字段 , 檢索訂單表中創建時間距離現在大于24 , 并且status為1 的數據 , update status為6。
  • 如果判斷超時 , 后續如何處理?
  • update status為6(訂單失效) , 對應的商品數量回退歸還庫存
  • 石英鐘插件
  • 核心組件
  • JobDetail+job
  • 繼承自石英鐘 的父類,啟動容器后,一旦加載了JobDetail的實例,其中JobDetail下的多個job邏輯需要編寫代碼(是整個使用石英鐘過程唯一編寫代碼的位置);加載這個實例后,石英鐘會為這個任務組注冊一個調度器(scheduler)
  • 調度器(Scheduler)
  • 負責調度對應一個JobDetail的時間出發器,trigger
  • 觸發器(Trigger)
  • 管理出發當前一個石英鐘邏輯的JobDetail的組件
  • 時間的計算表達式;何時觸發任務執行是由觸發器計算管理的;
  • 分為兩種觸發器:
  • simple觸發器:只能簡單的完成circle時間邏輯;每隔一段時間進行任務觸發;
  • cron觸發器:功能比較強大,可以完成simple的任務,還可以控制時間精度;每周五上午5點觸發一次,每個月第一個周末的上午五點;
  • 石英鐘調度過程
  • 創建JobDetail實例–>注冊調度器–>調度觸發器–>計算時間觸發邏輯–>觸發任務代碼–>執行job代碼
  • 將quartz引入電商項目中

  • 添加依賴

    <dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.1</version> </dependency>
  • 添加配置文件applicationContext-scheduler.xml , 與spring框架整合

    <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"><!-- 定義任務bean --><bean name="paymentOrderJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><!-- 指定具體的job類 --><property name="jobClass" value="com.jt.store.order.job.PaymentOrderJob" /><!-- 指定job的名稱 --><property name="name" value="paymentOrder" /><!-- 指定job的分組 --><property name="group" value="Order" /><!-- 必須設置為true,如果為false,當沒有活動的觸發器與之關聯時會在調度器中刪除該任務 --><property name="durability" value="true"/><!-- 指定spring容器的key,如果不設定在job中的jobmap中是獲取不到spring容器的 --><property name="applicationContextJobDataKey" value="applicationContext"/></bean><!-- 定義觸發器 --><bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"><property name="jobDetail" ref="paymentOrderJobDetail" /><!-- 每一分鐘執行一次 --><property name="cronExpression" value="0 0/1 * * * ?" /></bean><!-- 定義調度器 --><bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="cronTrigger" /></list></property></bean></beans>
  • 自定義一個job執行類 , 需要繼承石英鐘的父類 , 編寫完成之后需要修改配置文件中的自定義job類的全類名

    public class PayMentOrderJob extends QuartzJobBean{@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {//通過加載文件獲取spring的環境信息ApplicationContext applicationContext=(ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext");//獲取對應mapper中的bean,來執行對應的sql語句//傳遞的參數就是過期時間;需要你在代碼中計算過期時間吧?//執行自己需要執行的邏輯 applicationContext.getBean(OrderMapper.class).paymentOrderScan(new DateTime().minusDays(1).toDate());//System.out.println("hahahahahahah");}}
  • 二、 RabbitMQ消息隊列

  • 電商項目中引入niginx 、 redis 、 amoeba 之后 , 電商項目 的架構性能得到提升 , 其中:
  • nginx 解決了日常中的高并發
  • redis 使用內存緩存數據 , 分擔了數據庫的一部分壓力
  • amoeba 實現數據庫讀寫分離 , 保護了數據庫這最后一道關卡
  • 但是在超負荷請求的情況下(雙十一) , 請求量陡增 , 以上三個技術無法處理。
  • 例如:淘寶1000W/s,處理日常請求不在話下;雙十一3000W/s;2000w/s請求,被拒絕了;
  • 引入隊列的概念:也是一種緩存,可以在隊列中等待被處理;游戲登錄:服務器爆滿,您是第3984位等待者
  • java中其實自帶了一套消息隊列的api— JMS , 但是性能極低
  • 后來第三方消息隊列插件出現, 于是sun公司提出一套AMQP標準 , 只要滿足AMQP原則 , 就是一套性能超高的消息隊列 , RabbitMQ就是符合AMQP標準的消息隊列
  • RabbitMQ
  • 基于erlang語言: 是一種支持高并發的語言
  • RabbitMQ的六種工作模式:
  • simple簡單模式
  • 消息產生著(p)將消息放入隊列
  • 消息的消費者(consumer) 監聽(while) 消息隊列,如果隊列中有消息,就消費掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費者正確處理,已經從隊列中消失了,造成消息的丟失)應用場景:聊天(中間有一個過度的服務器;p端,c端)
  • work工作模式(資源的競爭)
  • 消息產生者將消息放入隊列消費者可以有多個,消費者1,消費者2,同時監聽同一個隊列,消息被消費?C1 C2共同爭搶當前的消息隊列內容,誰先拿到誰負責消費消息(隱患,高并發情況下,默認會產生某一個消息被多個消費者共同使用,可以設置一個開關(syncronize,與同步鎖的性能不一樣) 保證一條消息只能被一個消費者使用)
  • 應用場景:紅包;大項目中的資源調度(任務分配系統不需知道哪一個任務執行系統在空閑,直接將任務扔到消息隊列中,空閑的系統自動爭搶)
  • publish/subscribe發布訂閱(共享資源)
  • X代表交換機rabbitMQ內部組件,erlang 消息產生者是代碼完成,代碼的執行效率不高,消息產生者將消息放入交換機,交換機發布訂閱把消息發送到所有消息隊列中,對應消息隊列的消費者拿到消息進行消費
  • 相關場景:郵件群發,群聊天,廣播(廣告)
  • routing路由模式
  • 消息生產者將消息發送給交換機按照路由判斷,路由是字符串(info) 當前產生的消息攜帶路由字符(對象的方法),交換機根據路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息;
  • 根據業務功能定義路由字符串
  • 從系統的代碼邏輯中獲取對應的功能字符串,將消息任務扔到對應的隊列中業務場景:error 通知;EXCEPTION;錯誤通知的功能;傳統意義的錯誤通知;客戶通知;利用key路由,可以將程序中的錯誤封裝成消息傳入到消息隊列中,開發者可以自定義消費者,實時接收錯誤;
  • topic 主題模式(路由模式的一種)
  • 星號井號代表通配符
  • 星號代表多個單詞,井號代表一個單詞
  • 路由功能添加模糊匹配
  • 消息產生者產生消息,把消息交給交換機
  • 交換機根據key的規則模糊匹配到對應的隊列,由隊列的監聽消費者接收消息消費
  • RPC 先不做解釋
  • 這里注意三個別名
  • 從publish routing topic(都叫發布訂閱)
  • publish:fanout
  • routing:direct
  • topic:topic
  • rabbitmq的安裝與使用

  • rabbitmq使用erlang語言 開發 ,所以需要erlang 的運行環境 , 先安裝erlang 在虛擬機中執行wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm 下載
  • 創建一個rabbitmq的管理目錄 , 將下載的erlang資源移入管理目錄
  • 安裝 rpm -Uvh erlang-solutions-1.0.1.noarch.rpm U:表示安裝的時候所有的包自動更新
  • yum -y install erlang這里的過程非常緩慢,需要下載將60個rpm包安裝,一旦現在失敗,在執行一次將失敗的包重新下載安裝即可
  • 安裝rabbitmq
  • 上傳rpm
  • 上傳rabbitmq-server-3.6.1-1.noarch.rpm文件到rabbitmq管理目錄
  • 執行安裝
  • 安裝rpm之后很多文件都是散列放置的 ,默認只允許localhost用戶訪問 需要配置開啟用戶遠程訪問 執行cp /usr/share/doc/rabbitmq-server-3.6.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config(默認的安裝目錄為/usr/share/doc/rabbitmq-server-3.6.1)
  • 修改配置文件vim /etc/rabbitmq/rabbitmq.config P64行

    %% {loopback_users, []}, 修改1:去掉前面的兩個%%, 修改2:去掉最后面的逗號,保存。

  • 開啟后臺管理插件(rabbitmq web界面管理工具) ,執行rabbitmq-plugins enable rabbitmq_management
  • 開啟15672端口和5672端口

    /sbin/iptables -I INPUT -p tcp --dport 15672 –j ACCEPT #控制臺端口 瀏覽器訪問端口 /sbin/iptables -I INPUT -p tcp --dport 5672 –j ACCEPT???????? #程序訪問端口
  • 保存防火墻修改操作

    /etc/rc.d/init.d/iptables save /etc/init.d/iptables status
  • 開啟rabbitmq服務

    service rabbitmq-server start 開啟 service rabbitmq-server stop 關閉 service rabbitmq-server restart 重啟
  • 設置開機啟動
    chkconfig rabbitmq-server on
  • 使用瀏覽器訪問rabbitmq web控制臺 ip:15672
  • 默認的用戶名和密碼都是guest
  • rabbitmq web控制臺的使用:
  • 不能直接使用guest訪問賬號管理rabbitmq , 需要自定義一個具有對應更能權限的獨立賬號
  • 創建賬號
  • 點擊添加user可以看到在上面的列表中有了丟應的user但是目前沒有綁定的vh
  • 虛擬機相當于redis中的分庫分區(select0-15)。添加VH,這里的vh名稱必須在程序訪問時保持一致,有/
  • 綁定用戶,點擊虛擬機
  • rabbitmq示例代碼

    定義rabbitmq工具類 public class ConnectionUtils {public static Connection getConnettion() throws IOException{ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();return cn;}}simple模式 public class RabbitMQTest {@Testpublic void testSend() throws Exception{/** 1 創建連接工廠* 2 從工廠穿件connection* 3 從連接獲取channel* 4 從channel聲明綁定queue* 5 生產者生產消息放入隊列* 6 釋放資源*/ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();Channel chan = cn.createChannel();String QUEUE_NAME="simple";chan.queueDeclare(QUEUE_NAME, false, false, false, null);//第二個參數是否持久化,第三個參數是否獨占queue,第四個參數不使用達到一定時間是否自動刪除,第五個參數//其他參數;String msg="hello simple mode11111111";//模擬傳送到mq中的消息//把消息發送到queuechan.basicPublish("", QUEUE_NAME, null, msg.getBytes());//第一個參數與路由有關chan.close();cn.close();}@Test//消息消費端public void testRec() throws Exception{/** 1 創建連接工廠* 2 從工廠穿件connection* 3 從連接獲取channel* 4 從channel聲明綁定queue*5 消費者消費消息* 6 釋放資源*/ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();Channel chan = cn.createChannel();String QUEUE_NAME="simple";chan.queueDeclare(QUEUE_NAME, false, false, false, null);//創建一個consumerQueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume(QUEUE_NAME, true,consumer);while(true){Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println(msg);}} }work模式 public class WorkTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);//for循環發送消息for(int i=0;i<100;i++){String msg="work"+i;chan.basicPublish("", "work111", null, msg.getBytes());System.out.println("發送消息第"+i+"條");}chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);chan.basicQos(1);QueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume("work111", false,consumer);//獲取消息while(true){QueueingConsumer.Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println("接收到消息:"+msg);//模擬機器性能Thread.sleep(10);chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);chan.basicQos(1);QueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume("work111", false,consumer);//獲取消息while(true){QueueingConsumer.Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println("接收到消息:"+msg);//模擬機器性能Thread.sleep(500);chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }publish模式 public class PublishTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_publish", "fanout");//交換機的名稱,和交換機的種類for(int i=0;i<100;i++){String msg="publish num"+i;chan.basicPublish("jt_publish", "", null, msg.getBytes());System.out.println("發送了第"+i+"條");}}@Testpublic void testRec1() throws Exception{Connection cn= ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明隊列chan.queueDeclare("publish01", false, false, false, null);//綁定交換機chan.exchangeDeclare("jt_publish", "fanout");//綁定隊列到交換機chan.queueBind("publish01", "jt_publish", "");//同一時刻服務器只發送一條消息給消費者消費chan.basicQos(1);//定義隊列的消費者QueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("publish01", false,cons);//獲取消息while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn= ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明隊列chan.queueDeclare("publish02", false, false, false, null);//綁定交換機chan.exchangeDeclare("jt_publish", "fanout");//綁定隊列到交換機chan.queueBind("publish02", "jt_publish", "");//同一時刻服務器只發送一條消息給消費者消費chan.basicQos(1);//定義隊列的消費者QueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("publish02", false,cons);//獲取消息while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }routing public class RoutingTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_routing", "topic");//交換機的名稱,和交換機的種類//生成消息String msg="哈哈哈哈";String routingKey="item.add";chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());System.out.println("發送完成");chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing01", false, false, false, null);chan.queueBind("routing01", "jt_routing", "item.add");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing01", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("一號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing02", false, false, false, null);chan.queueBind("routing02", "jt_routing", "item1.update");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing02", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("二號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }topic模式 public class RoutingTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_routing", "topic");//交換機的名稱,和交換機的種類//生成消息String msg="哈哈哈哈";String routingKey="item.#";chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());System.out.println("發送完成");chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing01", false, false, false, null);chan.queueBind("routing01", "jt_routing", "item.add");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing01", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("一號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing02", false, false, false, null);chan.queueBind("routing02", "jt_routing", "item1.update");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing02", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("二號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }
  • 秒殺業務場景:

    業務場景分析:并發量很高單例rabbitmq的并發量在百萬級以上 有個商品在每天的8點開搶,每天的量5萬臺iphone4s可以引入rabbitmq 在搶單的請求發起后,可以將前臺的搶單信息放到rabbitmq; simple 隊列 100萬個人搶,寫入隊列中的消息可以是什么? 電話號碼 ticket username 生產者,前臺的controller或者service調用將請求中的信息保存到隊列中; 消費者,監聽隊列,一旦有數據,獲取數據 調用SSO查詢用戶信息,把前5萬個消息獲取到,后面的放入rabbitmq的垃圾桶; 并發更高的時候需要使用rabbitmq的分布式(峰會的題目之一)
  • 將rabbit引入電商項目中:

  • 在執行商品添加時 , 將商品存入數據庫后 , 還需要將商品信息放入緩存中 。 其中數據庫操作為主要操作 , 而緩存操作為不主要操作 , 這時為了提升添加商品模塊的性能 , 將緩存操作這類不重要的操作抽取出來 , 放入消息隊列中 , 稍后執行 。 將寫redis操作獨立出去,利用路由模式,在manage后臺系統的生產者將消息放入隊列
  • 示例代碼

  • 生產者端

  • 添加rabbitmq與spring整合 的配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd"><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"virtual-host="${rabbit.vhost}" /><!-- 定義Rabbit模板,指定連接工廠以及定義exchange --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory"exchange="itemDirectExchange" /><!-- MQ的管理,包括隊列、交換器等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 定義交換器,自動聲明,durable持久化 --><rabbit:direct-exchange name="itemDirectExchange" auto-declare="true" durable="true"></rabbit:direct-exchange></beans>
  • 添加rabbitmq的配置文件rabbitmq.properties

    rabbit.ip=106.75.74.254 rabbit.port=5672 rabbit.username=jtadmin rabbit.password=123456 rabbit.vhost=/jt
  • 在spring核心配置文件中配置讀取rabbitmq配置文件的配置

    <!-- 使用spring自帶的占位符替換功能,可以實現注解方式獲取屬性文件中的配置值 --><beanclass="com.jt.common.spring.exetend.ExtendedPropertyPlaceholderConfigurer"><!-- 允許JVM參數覆蓋 --><property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /><!-- 忽略沒有找到的資源文件 --><property name="ignoreResourceNotFound" value="true" /><!-- 配置資源文件 --><property name="locations"><list><value>classpath:jdbc.properties</value><value>classpath:env.properties</value><value>classpath:redis.properties</value><value>classpath:rabbitmq.properties</value></list></property></bean>
  • 在service層把需要的操作放入消息隊列 (需要先注入rabbitmq模板類) 把商品ID放入到指定的item.add消息隊列中 , 監聽item.add消息隊列消費時會做出特定的動作

    @Service public class ItemService extends BaseService<Item>{@Autowiredprivate ItemMapper itemMapper;@Autowiredprivate ItemDescMapper itemDescMapper;@Autowiredprivate RedisService redisService;private static final ObjectMapper MAPPER=new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitmq;//商品列表查詢public List<Item> queryItemList(){List<Item> itemList=itemMapper.queryItemList();return itemList;}public void saveItem(Item item,String desc) throws Exception {//insert 和insertSelective//insert方法在插入對象時,把對象的所有字段都插入//insertSelective插入非NULL值的所有字段//初始化數據沒有做 status created updateditem.setStatus(1);item.setCreated(new Date());item.setUpdated(item.getCreated());itemMapper.insert(item);//準備插入表格的對象ItemDesc itemDesc=new ItemDesc();//當前的itemDesc的關聯ID怎么獲取?itemDesc.setItemId(item.getId());itemDesc.setCreated(new Date());itemDesc.setUpdated(itemDesc.getCreated());itemDesc.setItemDesc(desc);//插入對象到表格itemDescMapper.insert(itemDesc);/*//添加新增商品的緩存寫入String key="ITEM_"+item.getId();String json=MAPPER.writeValueAsString(item);redisService.set(key, json);*///寫消息到消息隊列rabbitmq.convertAndSend("item.add", item.getId());}
  • 消費者端

  • 引入rabbitmq與spring整合的消費者配置文件

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd"><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"virtual-host="${rabbit.vhost}" /><!-- MQ的管理,包括隊列、交換器等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 定義消息隊列 --><rabbit:queue name="jt-web.itemQueue" auto-declare="true"/><!-- 定義交換機,并且完成隊列和交換機的綁定 --><rabbit:direct-exchange name="itemDirectExchange" auto-declare="true"><rabbit:bindings><!-- 前臺系統只接收商品更新的消息,key路由key --><rabbit:binding queue="jt-web.itemQueue" key="item.add"/></rabbit:bindings></rabbit:direct-exchange><!-- 定義監聽 --><rabbit:listener-container connection-factory="connectionFactory"><!-- 監聽一個隊列,當隊列中有消息,就會自動觸發類.方法,傳遞消息就作為方法的參數,根據方法聲明的參數強轉 --><rabbit:listener ref="rabbitItemService" method="addItem" queue-names="jt-web.itemQueue"/></rabbit:listener-container></beans>
  • 引入rabbitmq的配置文件rabbitmq.properties

    rabbit.ip=106.75.74.254 rabbit.port=5672 rabbit.username=jtadmin rabbit.password=123456 rabbit.vhost=/jt
  • 編寫消費者rabbitmq

    @Service public class RabbitItemService {@Autowiredprivate HttpClientService client;@Autowiredprivate RedisService redis;public void addItem(Long itemId) throws Exception{//把消息itemId獲取過來,到后臺拿數據//寫redisString url="http://manage.jt.com/item/"+itemId;String jsonItem = client.doGet(url,"utf-8");redis.set("ITEM_"+itemId, jsonItem);} }
  • 問題:

  • thread和threadLocal
  • 總結

    以上是生活随笔為你收集整理的大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。