當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
(需求实战_终章) SpringBoot2.x 整合RabbitMQ
生活随笔
收集整理的這篇文章主要介紹了
(需求实战_终章) SpringBoot2.x 整合RabbitMQ
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
文章目錄
- 1. maven依賴
- 2. MainConfig
- 3. application.properties
- 4. 發(fā)送字符串 生產(chǎn)者
- 5. 發(fā)送對象 生產(chǎn)者
- 6. 接收字符串客戶端
- 7. 接收對象客戶端
- 8.confirem 確認機制
- 9. return確認機制
- 10. MQ消息發(fā)送工具類封裝
- 11. 分布式id
- 12. 時間工具類
- 13. 對象
1. maven依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2. MainConfig
package com.gblfy.springboot.config;import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration;@Configuration @ComponentScan({"com.gblfy.springboot.*"}) public class MainConfig { }3. application.properties
#應用端口信息 server.port=80 #RabbitMQ 連接信息 #IP地址 spring.rabbitmq.addresses=127.0.0.1 #RabbitMQ 端口 spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=admin #密碼 spring.rabbitmq.password=admin #虛擬主機 spring.rabbitmq.virtual-host=/admin #連接超時時間 spring.rabbitmq.connection-timeout=15000spring.profiles.active=devapplication-dev.properties
#服務端 RabbitMQ 配置 #消息發(fā)送至交換機消息確認模式 是否確認回調(diào) spring.rabbitmq.publisher-confirms=true #消息發(fā)送至交換機消息確認模式 是否確認消息返回回調(diào) spring.rabbitmq.publisher-returns=true #消息手工簽收 spring.rabbitmq.template.mandatory=true#消費端 RabbitMQ 配置 #手動簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual #指定最小的消費者數(shù)量 spring.rabbitmq.listener.simple.concurrency=5 #指定最大的消費者數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=10 #接收字符串類型MQ消息spring.rabbitmq.listener.str.queue.name=queue-1 spring.rabbitmq.listener.str.queue.durable=true spring.rabbitmq.listener.str.exchange.name=exchange-1 spring.rabbitmq.listener.str.exchange.durable=true spring.rabbitmq.listener.str.exchange.type=topic spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.str.key=cus-str.##接收object類型MQ消息 spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-2 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=cus-obj.#YML
#----------------------------服務端(公有)配置---------------------------- spring:rabbitmq:addresses: 192.168.0.XXX #RabbitMQ服務端地址username: admin #用戶名password: admin #密碼port: 5672 #端口virtual-host: /admin #虛擬主機connection-timeout: 15000 #超時時間 #----------------------------生產(chǎn)端端配置----------------------------publisher-confirm-type: correlated #確認消息已發(fā)送至交換機,選擇交換類型為交互publisher-returns: true #在消息沒有被路由到指定的queue時將消息返回,而不是丟棄template:mandatory: true #是否手動簽收listener:simple:acknowledge-mode: manual #手動簽收concurrency: 5 #默認線程數(shù)max-concurrency: 10 #最大線程數(shù) #----------------------------消費端配置---------------------------- #----------------------------對象類型監(jiān)聽----------------------------order:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-2 #交換機名稱type: topic #消息類型key: cmiip-obj.# #消息路由key的路由規(guī)則queue:durable: true #是否持久化name: queue-2 #隊列名稱 #----------------------------字符串類型監(jiān)聽----------------------------str:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-1 #交換機名稱type: topic #消息類型key: cmiip-str.# #消息路由key的路由規(guī)則queue:durable: true #是否持久化name: queue-1 #隊列名稱4. 發(fā)送字符串 生產(chǎn)者
package com.gblfy.springboot.controller;import com.gblfy.springboot.utils.MQSendMsgUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class MQSendStrMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 發(fā)送字符串類型消息** @param exchangeName 交換機名稱* @param queueRouterKey 路由key* @param msg 報文* @return*/@GetMapping("/mQSendStrMsg")public String mQSendStrMsg(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey,@RequestParam(value = "msg") String msg) {mqSendMsgUtils.snedStrMQMsg(exchangeName, queueRouterKey, msg);return "發(fā)送字符串消息成功!";}//測試連接:http://localhost/mQSendStrMsg?exchangeName=exchange-1&queueRouterKey=cus&msg=測試2 }5. 發(fā)送對象 生產(chǎn)者
package com.gblfy.springboot.controller;import com.gblfy.springboot.entity.Order; import com.gblfy.springboot.utils.MQSendMsgUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;@RestController public class MQSendObjMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;@GetMapping("/mQSendObjMsg")public String mQSendStrMsg2(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey) {//模擬發(fā)送order對象Order order = new Order().builder().reqXml("我是請求報文").serviceName("接口名稱").resXml("我是響應報文").build();//模擬接口描述String serviceName = "TJHL";String queueDesc = "紐約理賠發(fā)送退單接口";//模擬接口類型String queueType = "WEBSERVICE";//調(diào)用MQ工具類發(fā)送消息mqSendMsgUtils.snedObjMqMsg(exchangeName, order, queueRouterKey, serviceName, queueDesc, queueType);return "發(fā)送對象消息成功!";}//測試連接:http://localhost/mQSendObjMsg?exchangeName=exchange-2&queueRouterKey=cus }6. 接收字符串客戶端
package com.gblfy.springboot.conusmer;import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;@Component public class CusStrQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 實時定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收字符串類型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str.queue.name}",durable = "${spring.rabbitmq.listener.str.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str.exchange.name}",durable = "${spring.rabbitmq.listener.str.exchange.durable}",type = "${spring.rabbitmq.listener.str.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 創(chuàng)建一個消費端軌跡表來存儲消息的軌跡數(shù)據(jù)String jsonMsg = new String(message.getBody());log.info("響應報文 mResXml: {}", jsonMsg);// 同一時刻服務器只會發(fā)一條消息給消費者channel.basicQos(1);// 反饋消息的消費狀態(tài)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//TODO 保存數(shù)據(jù)到數(shù)據(jù)庫} }7. 接收對象客戶端
package com.gblfy.springboot.conusmer;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.gblfy.springboot.entity.Order; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.stereotype.Component;import java.util.Map;@Component public class CusObjQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 實時定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收對象類型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable = "${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable = "${spring.rabbitmq.listener.order.exchange.durable}",type = "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 創(chuàng)建一個消費端軌跡表來存儲消息的軌跡數(shù)據(jù)String jsonMsg = new String(message.getBody());// 同一時刻服務器只會發(fā)一條消息給消費者channel.basicQos(1);// 反饋消息的消費狀態(tài)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//通過 判斷路由routingKey是否等于trace相同即可//fastjson解析MQ接收的json字符串 轉(zhuǎn)換成RequestInfo對象JSONObject jsonObject = JSON.parseObject(jsonMsg);Order orderInfo = JSON.toJavaObject(jsonObject, Order.class);log.info("接口名稱 serviceName: {}", orderInfo.getServiceName());log.info("請求報文 mReqXml: {}", orderInfo.getReqXml());log.info("響應報文 mResXml: {}", orderInfo.getResXml());MessageProperties messageProperties = message.getMessageProperties();log.info("交換機名稱 : {}", messageProperties.getReceivedExchange());log.info("路由key名稱 : {}", messageProperties.getReceivedRoutingKey());log.info("內(nèi)容類型 : {}", messageProperties.getContentType());log.info("內(nèi)容編碼 : {}", messageProperties.getContentEncoding());log.info("標簽 : {}", messageProperties.getDeliveryTag());// 2. 接收接口信息Map<String, Object> headers = message.getMessageProperties().getHeaders();log.info("隊列唯一標識ID: {}", headers.get("QUEUE_MSG_ID"));log.info("隊列名稱: {}", headers.get("QUEUE_NAME"));log.info("隊列類型: {}", headers.get("QUEUE_TYPE"));log.info("隊列描述: {}", headers.get("QUEUE_DESC"));//TODO 保存數(shù)據(jù)到數(shù)據(jù)庫} }8.confirem 確認機制
package com.gblfy.springboot.confirms;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component;@Component("confirmCallback") public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志輸出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生產(chǎn)者消息發(fā)送成功與失敗確認機制* <p>* 1. ack* true : 標志生產(chǎn)者將消息發(fā)出成功* false: 標志生產(chǎn)者將消息發(fā)出失敗* 2. ack :true 意味著消息發(fā)送成功 有2種場景* 第一種:生產(chǎn)者將消息成功發(fā)送到指定隊列中,等待消費者消費消息* 第兩種:生產(chǎn)者將消息發(fā)送成功,但是,由于無法路由到指定的消息* 隊列,這種場景的消息,會被return機制監(jiān)聽到,后續(xù)進行* 補償機制,做消息補發(fā)處理* </p>** @param correlationData 隊列消息的唯一標識ID,消息做補償機制會用到* @param ack ack 消息是否發(fā)送成功的標識* @param cause 消息發(fā)送失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息隊列標識ID: {}", correlationData.getId());log.info("發(fā)送消息狀態(tài): {}", ack);//TODO 消息發(fā)送交換機成功 保存軌跡記錄if (!ack) {//TODO 消息發(fā)送交換機失敗 保存軌跡記錄log.info("異常處理....");}} } /*** !ack 場景結果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 異常處理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失敗原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/9. return確認機制
package com.gblfy.springboot.returns;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;@Component("returnCallback") public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 實時定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息無法路由 觸發(fā)消息 return機制* <p></p>* 1. 消費者在消息沒有被路由到合適隊列情況下會被return監(jiān)聽,而不會自動刪除* 2. 會監(jiān)聽到生產(chǎn)者發(fā)送消息的關鍵信息* 3. 根據(jù)關鍵信息,后續(xù)進行補償機制,做消息補發(fā)處理* </p>** @param message 消息實體* @param replyCode 應答碼312* @param replyText NO_ROUTE* @param exchange 交換機* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息發(fā)送的指定交換機: {}", exchange);log.info("隊列路由的routingKey: {}", routingKey);log.info("隊列的響應碼replyCode: {}", replyCode);log.info("隊列的響應信息: {}", replyText);//TODO 消息發(fā)送交換機成功 路由失敗 保存軌跡記錄} } /*** 場景結果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已經(jīng)被ack成功*/10. MQ消息發(fā)送工具類封裝
package com.gblfy.springboot.utils;import com.fasterxml.jackson.databind.ObjectMapper; import com.gblfy.springboot.confirms.ConfirmCallBackListener; import com.gblfy.springboot.consts.MQPrefixConst; import com.gblfy.springboot.entity.Order; import com.gblfy.springboot.returns.ReturnCallBackListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** MQ發(fā)送 不同類型消息 公用工具類* <p>* MQ發(fā)送消息模式采用 訂閱模式(topic)中的通配符模式* order.* 區(qū)配一個詞* order.# 區(qū)配一個或者多個詞* <p>** @author gblfy* @date 2020-04-16*/ @Component public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);//引入json工具類private static final ObjectMapper MAPPER = new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitTemplate;//注入發(fā)送消息模板@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 發(fā)送MQ STRING類型消息 第1種** @param exchangeName 指定交換機名稱* @param type 路由routingKey* @param msg MQ STRING類型消息*/public void snedStrMQMsg(String exchangeName, String type, String msg) {try {/*** CorrelationData 說明:* 1. correlationId 作為生產(chǎn)端和消息綁定消息隊列全局唯一標識* 2. 當生產(chǎn)端發(fā)送的消息無法路由到指定的消息隊列時,此種場* 景的消息會被生產(chǎn)端會return確認機制監(jiān)聽到,對消息做補* 償機制處理*///通過雪花算法生成全局唯一ID,用于消息發(fā)送失敗,后期做消息補償處理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// Confirm 消息確認策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息確認策略rabbitTemplate.setReturnCallback(returnCallback);//發(fā)送消息到MQ的交換機,通知其他系統(tǒng)rabbitTemplate.convertAndSend(exchangeName, MQPrefixConst.CUS_MQ_STR_PRE + type, msg.getBytes(), correlationId);} catch (Exception e) {e.printStackTrace();}}public void snedObjMqMsg(String exchangeName, Order order, String queueRouteKey, String queueName, String queueDesc, String queueType) {try {/*** CorrelationData 說明:* 1. correlationId 作為生產(chǎn)端和消息綁定消息隊列全局唯一標識* 2. 當生產(chǎn)端發(fā)送的消息無法路由到指定的消息隊列時,此種場* 景的消息會被生產(chǎn)端會return確認機制監(jiān)聽到,對消息做補* 償機制處理*/// Confirm 消息確認策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息確認策略rabbitTemplate.setReturnCallback(returnCallback);//1.對象處理String jsonStrObj = MAPPER.writeValueAsString(order);// 2. 通過雪花算法生成全局唯一ID,用于消息發(fā)送失敗,后期做消息補償處理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// MQ 添加額外參數(shù)設置 用于定位該消息屬于什么接口Message message = addExtraParameters(jsonStrObj, correlationId, queueName, queueDesc, queueType);// 3.發(fā)送數(shù)據(jù)消息到指定的MQ交換機,通知其他系統(tǒng)rabbitTemplate.convertAndSend(exchangeName,MQPrefixConst.CUS_MQ_OBJ_PRE + queueRouteKey, message, correlationId);} catch (Exception e) {e.printStackTrace();}}/*** MQ 添加額外參數(shù)設置** @param jsonStrObj json處理前的數(shù)據(jù)對象* @param queueDesc 隊列描述* @param queueType 隊列類型* @return*/public Message addExtraParameters(String jsonStrObj, CorrelationData correlationId, String queueName, String queueDesc, String queueType) {MessageProperties messageProperties = new MessageProperties();//這里注意一定要修改contentType為 application/jsonmessageProperties.setContentType("application/json");messageProperties.setContentEncoding("UTF-8");messageProperties.getHeaders().put("QUEUE_NAME", queueName);messageProperties.getHeaders().put("QUEUE_DESC", queueDesc);messageProperties.getHeaders().put("QUEUE_TYPE", queueType);messageProperties.getHeaders().put("QUEUE_MSG_ID", correlationId.getId());messageProperties.getHeaders().put("SEND_DATE", MQTimeUtils.CURRENT_DATE_TIME);Message message = new Message(jsonStrObj.getBytes(), messageProperties);return message;} }11. 分布式id
package com.gblfy.springboot.utils;import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils;import java.net.Inet4Address; import java.net.UnknownHostException;/*** Twitter_Snowflake<br>* SnowFlake的結構如下(每部分用-分開):<br>* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>* 1位標識,由于long基本類型在Java中是帶符號的,最高位是符號位,正數(shù)是0,負數(shù)是1,所以id一般是正數(shù),最高位是0<br>* 41位時間截(毫秒級),注意,41位時間截不是存儲當前時間的時間截,而是存儲時間截的差值(當前時間截 - 開始時間截)* 得到的值),這里的的開始時間截,一般是我們的id生成器開始使用的時間,由我們程序來指定的(如下下面程序IdWorker類的startTime屬性)。41位的時間截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>* 10位的數(shù)據(jù)機器位,可以部署在1024個節(jié)點,包括5位datacenterId和5位workerId<br>* 12位序列,毫秒內(nèi)的計數(shù),12位的計數(shù)順序號支持每個節(jié)點每毫秒(同一機器,同一時間截)產(chǎn)生4096個ID序號<br>* 加起來剛好64位,為一個Long型。<br>* SnowFlake的優(yōu)點是,整體上按照時間自增排序,并且整個分布式系統(tǒng)內(nèi)不會產(chǎn)生ID碰撞(由數(shù)據(jù)中心ID和機器ID作區(qū)分),并且效率較高,經(jīng)測試,SnowFlake每秒能夠產(chǎn)生26萬ID左右。*/ public class SnowflakeIdWorker {// ==============================Fields===========================================/*** 開始時間截 (2015-01-01)*/private final long twepoch = 1489111610226L;/*** 機器id所占的位數(shù)*/private final long workerIdBits = 5L;/*** 數(shù)據(jù)標識id所占的位數(shù)*/private final long dataCenterIdBits = 5L;/*** 支持的最大機器id,結果是31 (這個移位算法可以很快的計算出幾位二進制數(shù)所能表示的最大十進制數(shù))*/private final long maxWorkerId = -1L ^ (-1L << workerIdBits);/*** 支持的最大數(shù)據(jù)標識id,結果是31*/private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);/*** 序列在id中占的位數(shù)*/private final long sequenceBits = 12L;/*** 機器ID向左移12位*/private final long workerIdShift = sequenceBits;/*** 數(shù)據(jù)標識id向左移17位(12+5)*/private final long dataCenterIdShift = sequenceBits + workerIdBits;/*** 時間截向左移22位(5+5+12)*/private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 生成序列的掩碼,這里為4095 (0b111111111111=0xfff=4095)*/private final long sequenceMask = -1L ^ (-1L << sequenceBits);/*** 工作機器ID(0~31)*/private long workerId;/*** 數(shù)據(jù)中心ID(0~31)*/private long dataCenterId;/*** 毫秒內(nèi)序列(0~4095)*/private long sequence = 0L;/*** 上次生成ID的時間截*/private long lastTimestamp = -1L;private static SnowflakeIdWorker idWorker;static {idWorker = new SnowflakeIdWorker(getWorkId(), getDataCenterId());}//==============================Constructors=====================================/*** 構造函數(shù)** @param workerId 工作ID (0~31)* @param dataCenterId 數(shù)據(jù)中心ID (0~31)*/public SnowflakeIdWorker(long workerId, long dataCenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));}if (dataCenterId > maxDataCenterId || dataCenterId < 0) {throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));}this.workerId = workerId;this.dataCenterId = dataCenterId;}// ==============================Methods==========================================/*** 獲得下一個ID (該方法是線程安全的)** @return SnowflakeId*/public synchronized long nextId() {long timestamp = timeGen();//如果當前時間小于上一次ID生成的時間戳,說明系統(tǒng)時鐘回退過這個時候應當拋出異常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}//如果是同一時間生成的,則進行毫秒內(nèi)序列if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;//毫秒內(nèi)序列溢出if (sequence == 0) {//阻塞到下一個毫秒,獲得新的時間戳timestamp = tilNextMillis(lastTimestamp);}}//時間戳改變,毫秒內(nèi)序列重置else {sequence = 0L;}//上次生成ID的時間截lastTimestamp = timestamp;//移位并通過或運算拼到一起組成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift)| (dataCenterId << dataCenterIdShift)| (workerId << workerIdShift)| sequence;}/*** 阻塞到下一個毫秒,直到獲得新的時間戳** @param lastTimestamp 上次生成ID的時間截* @return 當前時間戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 返回以毫秒為單位的當前時間** @return 當前時間(毫秒)*/protected long timeGen() {return System.currentTimeMillis();}private static Long getWorkId() {try {String hostAddress = Inet4Address.getLocalHost().getHostAddress();int[] ints = StringUtils.toCodePoints(hostAddress);int sums = 0;for (int b : ints) {sums += b;}return (long) (sums % 32);} catch (UnknownHostException e) {// 如果獲取失敗,則使用隨機數(shù)備用return RandomUtils.nextLong(0, 31);}}private static Long getDataCenterId() {int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());int sums = 0;for (int i : ints) {sums += i;}return (long) (sums % 32);}/*** 靜態(tài)工具類** @return*/public static Long generateId() {long id = idWorker.nextId();return id;}//==============================Test=============================================/*** 測試*/public static void main(String[] args) {System.out.println(System.currentTimeMillis());long startTime = System.nanoTime();for (int i = 0; i < 50000; i++) {long id = SnowflakeIdWorker.generateId();System.out.println(id);}System.out.println((System.nanoTime() - startTime) / 1000000 + "ms");} }12. 時間工具類
package com.gblfy.springboot.utils;import org.springframework.stereotype.Component;import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date;@Component public class MQTimeUtils {//格式化時間 日期格式public static final DateFormat DATE_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss");// 日期格式//當前日期+時間 用于定位 消息隊列服務端和和生產(chǎn)發(fā)送消息時間 確認什么類型的什么接口public static final String CURRENT_DATE_TIME = DATE_TIME_FORMAT.format(new Date());/*** 獲取當前日期 類型Date*/public static Date getCurrentDate() {Date currentDate = null;try {currentDate = DATE_FORMAT.parse(DATE_FORMAT.format(new Date()));} catch (ParseException e) {e.printStackTrace();}return currentDate;}/*** 獲取當前日期 類型String*/public static String getCurrentDateToStr() {String currentDateToStr = null;try {currentDateToStr = DATE_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentDateToStr;}/*** 獲取當前時間 類型String*/public static String getCurrenTimeToStr() {String currentTimeToStr = null;try {currentTimeToStr = TIME_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentTimeToStr;}public static void main(String[] args) {System.out.println(MQTimeUtils.getCurrentDate());System.out.println(MQTimeUtils.getCurrentDateToStr());} }13. 對象
package com.gblfy.springboot.entity;import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;@Data @AllArgsConstructor @NoArgsConstructor @Builder public class Order implements Serializable {private String serviceName;private String reqXml;private String resXml; }總結
以上是生活随笔為你收集整理的(需求实战_终章) SpringBoot2.x 整合RabbitMQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux下删除目录及其子目录下某种类型
- 下一篇: PMP考试必看的答题技巧分享