大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列
生活随笔
收集整理的這篇文章主要介紹了
大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
QuartZ定時任務+RabbitMQ消息隊列
一 、QuartZ定時任務解決訂單系統遺留問題
將quartz引入電商項目中
添加依賴
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.1</version> </dependency>添加配置文件applicationContext-scheduler.xml , 與spring框架整合
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"><!-- 定義任務bean --><bean name="paymentOrderJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"><!-- 指定具體的job類 --><property name="jobClass" value="com.jt.store.order.job.PaymentOrderJob" /><!-- 指定job的名稱 --><property name="name" value="paymentOrder" /><!-- 指定job的分組 --><property name="group" value="Order" /><!-- 必須設置為true,如果為false,當沒有活動的觸發器與之關聯時會在調度器中刪除該任務 --><property name="durability" value="true"/><!-- 指定spring容器的key,如果不設定在job中的jobmap中是獲取不到spring容器的 --><property name="applicationContextJobDataKey" value="applicationContext"/></bean><!-- 定義觸發器 --><bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"><property name="jobDetail" ref="paymentOrderJobDetail" /><!-- 每一分鐘執行一次 --><property name="cronExpression" value="0 0/1 * * * ?" /></bean><!-- 定義調度器 --><bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"><property name="triggers"><list><ref bean="cronTrigger" /></list></property></bean></beans>自定義一個job執行類 , 需要繼承石英鐘的父類 , 編寫完成之后需要修改配置文件中的自定義job類的全類名
public class PayMentOrderJob extends QuartzJobBean{@Overrideprotected void executeInternal(JobExecutionContext context) throws JobExecutionException {//通過加載文件獲取spring的環境信息ApplicationContext applicationContext=(ApplicationContext) context.getJobDetail().getJobDataMap().get("applicationContext");//獲取對應mapper中的bean,來執行對應的sql語句//傳遞的參數就是過期時間;需要你在代碼中計算過期時間吧?//執行自己需要執行的邏輯 applicationContext.getBean(OrderMapper.class).paymentOrderScan(new DateTime().minusDays(1).toDate());//System.out.println("hahahahahahah");}}二、 RabbitMQ消息隊列
rabbitmq的安裝與使用
修改配置文件vim /etc/rabbitmq/rabbitmq.config P64行
%% {loopback_users, []}, 修改1:去掉前面的兩個%%, 修改2:去掉最后面的逗號,保存。開啟15672端口和5672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 –j ACCEPT #控制臺端口 瀏覽器訪問端口 /sbin/iptables -I INPUT -p tcp --dport 5672 –j ACCEPT???????? #程序訪問端口保存防火墻修改操作
/etc/rc.d/init.d/iptables save /etc/init.d/iptables status開啟rabbitmq服務
service rabbitmq-server start 開啟 service rabbitmq-server stop 關閉 service rabbitmq-server restart 重啟chkconfig rabbitmq-server on
rabbitmq示例代碼
定義rabbitmq工具類 public class ConnectionUtils {public static Connection getConnettion() throws IOException{ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();return cn;}}simple模式 public class RabbitMQTest {@Testpublic void testSend() throws Exception{/** 1 創建連接工廠* 2 從工廠穿件connection* 3 從連接獲取channel* 4 從channel聲明綁定queue* 5 生產者生產消息放入隊列* 6 釋放資源*/ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();Channel chan = cn.createChannel();String QUEUE_NAME="simple";chan.queueDeclare(QUEUE_NAME, false, false, false, null);//第二個參數是否持久化,第三個參數是否獨占queue,第四個參數不使用達到一定時間是否自動刪除,第五個參數//其他參數;String msg="hello simple mode11111111";//模擬傳送到mq中的消息//把消息發送到queuechan.basicPublish("", QUEUE_NAME, null, msg.getBytes());//第一個參數與路由有關chan.close();cn.close();}@Test//消息消費端public void testRec() throws Exception{/** 1 創建連接工廠* 2 從工廠穿件connection* 3 從連接獲取channel* 4 從channel聲明綁定queue*5 消費者消費消息* 6 釋放資源*/ConnectionFactory factory= new ConnectionFactory();//需要連接信息;factory.setHost("106.75.74.254");factory.setPort(5672);factory.setVirtualHost("/jt");factory.setUsername("jtadmin");factory.setPassword("123456");Connection cn = factory.newConnection();Channel chan = cn.createChannel();String QUEUE_NAME="simple";chan.queueDeclare(QUEUE_NAME, false, false, false, null);//創建一個consumerQueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume(QUEUE_NAME, true,consumer);while(true){Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println(msg);}} }work模式 public class WorkTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);//for循環發送消息for(int i=0;i<100;i++){String msg="work"+i;chan.basicPublish("", "work111", null, msg.getBytes());System.out.println("發送消息第"+i+"條");}chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);chan.basicQos(1);QueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume("work111", false,consumer);//獲取消息while(true){QueueingConsumer.Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println("接收到消息:"+msg);//模擬機器性能Thread.sleep(10);chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.queueDeclare("work111", false, false, false, null);chan.basicQos(1);QueueingConsumer consumer=new QueueingConsumer(chan);chan.basicConsume("work111", false,consumer);//獲取消息while(true){QueueingConsumer.Delivery delivery=consumer.nextDelivery();String msg=new String(delivery.getBody());System.out.println("接收到消息:"+msg);//模擬機器性能Thread.sleep(500);chan.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }publish模式 public class PublishTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_publish", "fanout");//交換機的名稱,和交換機的種類for(int i=0;i<100;i++){String msg="publish num"+i;chan.basicPublish("jt_publish", "", null, msg.getBytes());System.out.println("發送了第"+i+"條");}}@Testpublic void testRec1() throws Exception{Connection cn= ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明隊列chan.queueDeclare("publish01", false, false, false, null);//綁定交換機chan.exchangeDeclare("jt_publish", "fanout");//綁定隊列到交換機chan.queueBind("publish01", "jt_publish", "");//同一時刻服務器只發送一條消息給消費者消費chan.basicQos(1);//定義隊列的消費者QueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("publish01", false,cons);//獲取消息while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn= ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明隊列chan.queueDeclare("publish02", false, false, false, null);//綁定交換機chan.exchangeDeclare("jt_publish", "fanout");//綁定隊列到交換機chan.queueBind("publish02", "jt_publish", "");//同一時刻服務器只發送一條消息給消費者消費chan.basicQos(1);//定義隊列的消費者QueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("publish02", false,cons);//獲取消息while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }routing public class RoutingTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_routing", "topic");//交換機的名稱,和交換機的種類//生成消息String msg="哈哈哈哈";String routingKey="item.add";chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());System.out.println("發送完成");chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing01", false, false, false, null);chan.queueBind("routing01", "jt_routing", "item.add");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing01", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("一號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing02", false, false, false, null);chan.queueBind("routing02", "jt_routing", "item1.update");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing02", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("二號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }topic模式 public class RoutingTest {@Testpublic void testSend() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();//聲明交換機exchangechan.exchangeDeclare("jt_routing", "topic");//交換機的名稱,和交換機的種類//生成消息String msg="哈哈哈哈";String routingKey="item.#";chan.basicPublish("jt_routing", routingKey, null, msg.getBytes());System.out.println("發送完成");chan.close();cn.close();}@Testpublic void testRec1() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing01", false, false, false, null);chan.queueBind("routing01", "jt_routing", "item.add");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing01", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("一號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}}@Testpublic void testRec2() throws Exception{Connection cn=ConnectionUtils.getConnettion();Channel chan=cn.createChannel();chan.exchangeDeclare("jt_routing", "direct");chan.queueDeclare("routing02", false, false, false, null);chan.queueBind("routing02", "jt_routing", "item1.update");//生成consumerQueueingConsumer cons=new QueueingConsumer(chan);chan.basicConsume("routing02", false,cons);while(true){QueueingConsumer.Delivery deli=cons.nextDelivery();String msg=new String(deli.getBody());System.out.println("二號接受者接收到消息:"+msg);chan.basicAck(deli.getEnvelope().getDeliveryTag(), false);}} }秒殺業務場景:
業務場景分析:并發量很高單例rabbitmq的并發量在百萬級以上 有個商品在每天的8點開搶,每天的量5萬臺iphone4s可以引入rabbitmq 在搶單的請求發起后,可以將前臺的搶單信息放到rabbitmq; simple 隊列 100萬個人搶,寫入隊列中的消息可以是什么? 電話號碼 ticket username 生產者,前臺的controller或者service調用將請求中的信息保存到隊列中; 消費者,監聽隊列,一旦有數據,獲取數據 調用SSO查詢用戶信息,把前5萬個消息獲取到,后面的放入rabbitmq的垃圾桶; 并發更高的時候需要使用rabbitmq的分布式(峰會的題目之一)將rabbit引入電商項目中:
示例代碼
生產者端
添加rabbitmq與spring整合 的配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd"><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"virtual-host="${rabbit.vhost}" /><!-- 定義Rabbit模板,指定連接工廠以及定義exchange --><rabbit:template id="amqpTemplate" connection-factory="connectionFactory"exchange="itemDirectExchange" /><!-- MQ的管理,包括隊列、交換器等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 定義交換器,自動聲明,durable持久化 --><rabbit:direct-exchange name="itemDirectExchange" auto-declare="true" durable="true"></rabbit:direct-exchange></beans>添加rabbitmq的配置文件rabbitmq.properties
rabbit.ip=106.75.74.254 rabbit.port=5672 rabbit.username=jtadmin rabbit.password=123456 rabbit.vhost=/jt在spring核心配置文件中配置讀取rabbitmq配置文件的配置
<!-- 使用spring自帶的占位符替換功能,可以實現注解方式獲取屬性文件中的配置值 --><beanclass="com.jt.common.spring.exetend.ExtendedPropertyPlaceholderConfigurer"><!-- 允許JVM參數覆蓋 --><property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /><!-- 忽略沒有找到的資源文件 --><property name="ignoreResourceNotFound" value="true" /><!-- 配置資源文件 --><property name="locations"><list><value>classpath:jdbc.properties</value><value>classpath:env.properties</value><value>classpath:redis.properties</value><value>classpath:rabbitmq.properties</value></list></property></bean>在service層把需要的操作放入消息隊列 (需要先注入rabbitmq模板類) 把商品ID放入到指定的item.add消息隊列中 , 監聽item.add消息隊列消費時會做出特定的動作
@Service public class ItemService extends BaseService<Item>{@Autowiredprivate ItemMapper itemMapper;@Autowiredprivate ItemDescMapper itemDescMapper;@Autowiredprivate RedisService redisService;private static final ObjectMapper MAPPER=new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitmq;//商品列表查詢public List<Item> queryItemList(){List<Item> itemList=itemMapper.queryItemList();return itemList;}public void saveItem(Item item,String desc) throws Exception {//insert 和insertSelective//insert方法在插入對象時,把對象的所有字段都插入//insertSelective插入非NULL值的所有字段//初始化數據沒有做 status created updateditem.setStatus(1);item.setCreated(new Date());item.setUpdated(item.getCreated());itemMapper.insert(item);//準備插入表格的對象ItemDesc itemDesc=new ItemDesc();//當前的itemDesc的關聯ID怎么獲取?itemDesc.setItemId(item.getId());itemDesc.setCreated(new Date());itemDesc.setUpdated(itemDesc.getCreated());itemDesc.setItemDesc(desc);//插入對象到表格itemDescMapper.insert(itemDesc);/*//添加新增商品的緩存寫入String key="ITEM_"+item.getId();String json=MAPPER.writeValueAsString(item);redisService.set(key, json);*///寫消息到消息隊列rabbitmq.convertAndSend("item.add", item.getId());}消費者端
引入rabbitmq與spring整合的消費者配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-4.1.xsd"><!-- 定義RabbitMQ的連接工廠 --><rabbit:connection-factory id="connectionFactory"host="${rabbit.ip}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"virtual-host="${rabbit.vhost}" /><!-- MQ的管理,包括隊列、交換器等 --><rabbit:admin connection-factory="connectionFactory" /><!-- 定義消息隊列 --><rabbit:queue name="jt-web.itemQueue" auto-declare="true"/><!-- 定義交換機,并且完成隊列和交換機的綁定 --><rabbit:direct-exchange name="itemDirectExchange" auto-declare="true"><rabbit:bindings><!-- 前臺系統只接收商品更新的消息,key路由key --><rabbit:binding queue="jt-web.itemQueue" key="item.add"/></rabbit:bindings></rabbit:direct-exchange><!-- 定義監聽 --><rabbit:listener-container connection-factory="connectionFactory"><!-- 監聽一個隊列,當隊列中有消息,就會自動觸發類.方法,傳遞消息就作為方法的參數,根據方法聲明的參數強轉 --><rabbit:listener ref="rabbitItemService" method="addItem" queue-names="jt-web.itemQueue"/></rabbit:listener-container></beans>引入rabbitmq的配置文件rabbitmq.properties
rabbit.ip=106.75.74.254 rabbit.port=5672 rabbit.username=jtadmin rabbit.password=123456 rabbit.vhost=/jt編寫消費者rabbitmq
@Service public class RabbitItemService {@Autowiredprivate HttpClientService client;@Autowiredprivate RedisService redis;public void addItem(Long itemId) throws Exception{//把消息itemId獲取過來,到后臺拿數據//寫redisString url="http://manage.jt.com/item/"+itemId;String jsonItem = client.doGet(url,"utf-8");redis.set("ITEM_"+itemId, jsonItem);} }問題:
總結
以上是生活随笔為你收集整理的大数据互联网架构阶段 QuartZ定时任务+RabbitMQ消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互联网架构阶段 数据库读写分离 Amo
- 下一篇: 大数据互联网架构阶段 全文检索技术