javascript
利用SpringBoot+RabbitMQ,实现一个邮件推送服务
一、流程圖
本文內(nèi)容主要圍繞這個流程圖展開,利用 RabbitMQ 消息隊列,實現(xiàn)生產(chǎn)者與消費者解耦,所以有必要先貼出來,涵蓋了 RabbitMQ 很多知識點,如:
消息發(fā)送確認(rèn)機制
消費確認(rèn)機制
消息的重新投遞
消費冪等性, 等等
二、實現(xiàn)思路
1.在虛擬機創(chuàng)建一個CentOS7上,并安裝 RabbitMQ
2.開放QQ郵箱或者其它郵箱授權(quán)碼,用于發(fā)送郵件
3.創(chuàng)建郵件發(fā)送項目并編寫代碼
4.發(fā)送郵件測試
5.消息發(fā)送失敗處理
三、RabbitMQ安裝
RabbitMQ 基于 erlang 進(jìn)行通信,相比其它的軟件,安裝有些麻煩,不過本例采用rpm方式安裝,任何新手都可以完成安裝,過程如下!
3.1、安裝前命令準(zhǔn)備
輸入如下命令,完成安裝前的環(huán)境準(zhǔn)備。
yum?install?lsof??build-essential?openssl?openssl-devel?unixODBC?unixODBC-devel?make?gcc?gcc-c++?kernel-devel?m4?ncurses-devel?tk?tc?xz?wget?vim3.2、下載 RabbitMQ、erlang、socat 的安裝包
本次下載的是RabbitMQ-3.6.5版本,采用rpm一鍵安裝,適合新手直接上手。
先創(chuàng)建一個rabbitmq目錄,本例的目錄路徑為/usr/app/rabbitmq,然后在目錄下執(zhí)行如下命令,下載安裝包!
下載erlang
下載socat
下載rabbitMQ
最終目錄文件如下:
3.3、安裝軟件包
下載完之后,按順序依次安裝軟件包,這個很重要哦~
安裝erlang
安裝socat
安裝rabbitmq
安裝完成之后,修改rabbitmq的配置,默認(rèn)配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
vim?/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app修改loopback_users節(jié)點的值!
最后只需通過如下命令,啟動服務(wù)即可!
rabbitmq-server?start?&運行腳本之后,如果報錯,例如下圖!
解決辦法如下:
vim?/etc/rabbitmq/rabbitmq-env.conf在文件里添加一行,如下配置!
NODENAME=rabbit@localhost然后,再保存!再次以下命令啟動服務(wù)!
rabbitmq-server?start?&通過如下命令,查詢服務(wù)是否啟動成功!
lsof?-i:5672如果出現(xiàn)5672已經(jīng)被監(jiān)聽,說明已經(jīng)啟動成功!
3.4、啟動可視化的管控臺
輸入如下命令,啟動控制臺!
rabbitmq-plugins?enable?rabbitmq_management用瀏覽器打開http://ip:15672,這里的ip就是 CentOS 系統(tǒng)的 ip,結(jié)果如下:
賬號、密碼,默認(rèn)為guest,如果出現(xiàn)無法訪問,檢測防火墻是否開啟,如果開啟將其關(guān)閉即可!
登錄之后的監(jiān)控平臺,界面如下:
四、郵箱授權(quán)碼的獲取
獲取郵箱授權(quán)碼的目的,主要是為了通過代碼進(jìn)行發(fā)送郵件,例如 QQ 郵箱授權(quán)碼獲取方式,如下圖:
點擊【開啟】按鈕,然后發(fā)送短信,即可獲取授權(quán)碼,該授權(quán)碼就是配置文件spring.mail.password需要的密碼!
五、項目介紹
springboot版本:2.1.5.RELEASE
RabbitMQ版本:3.6.5
SendMailUtil:發(fā)送郵件工具類
ProduceServiceImpl:生產(chǎn)者,發(fā)送消息
ConsumerMailService:消費者,消費消息,發(fā)送郵件
六、代碼實現(xiàn)
6.1、創(chuàng)建項目
在 IDEA 下創(chuàng)建一個名稱為smail的 Springboot 項目,pom文件中加入amqp和mail。
<dependencies><!--spring?boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring?boot?測試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc?web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--開發(fā)環(huán)境調(diào)試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--mail?支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp?支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--?commons-lang3?--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.4</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency> </dependencies>6.2、配置rabbitMQ、mail
在application.properties文件中,配置amqp和mail!
#rabbitmq spring.rabbitmq.host=192.168.0.103 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啟confirms回調(diào) P -> Exchange spring.rabbitmq.publisher-confirms=true # 開啟returnedMessage回調(diào) Exchange -> Queue spring.rabbitmq.publisher-returns=true # 設(shè)置手動確認(rèn)(ack) Queue -> C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100# mail spring.mail.default-encoding=UTF-8 spring.mail.host=smtp.qq.com spring.mail.username=1370887518@qq.com spring.mail.password=獲取的郵箱授權(quán)碼 spring.mail.from=1370887518@qq.com spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true其中,spring.mail.password第四步中獲取的授權(quán)碼,同時username和from要一致!
6.3、RabbitConfig配置類
@Configuration @Slf4j public?class?RabbitConfig?{//?發(fā)送郵件public?static?final?String?MAIL_QUEUE_NAME?=?"mail.queue";public?static?final?String?MAIL_EXCHANGE_NAME?=?"mail.exchange";public?static?final?String?MAIL_ROUTING_KEY_NAME?=?"mail.routing.key";@Autowiredprivate?CachingConnectionFactory?connectionFactory;@Beanpublic?RabbitTemplate?rabbitTemplate()?{RabbitTemplate?rabbitTemplate?=?new?RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(converter());//?消息是否成功發(fā)送到ExchangerabbitTemplate.setConfirmCallback((correlationData,?ack,?cause)?->?{if?(ack)?{log.info("消息成功發(fā)送到Exchange");}?else?{log.info("消息發(fā)送到Exchange失敗,?{},?cause:?{}",?correlationData,?cause);}});//?觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true,?否則Exchange沒有找到Queue就會丟棄掉消息,?而不會觸發(fā)回調(diào)rabbitTemplate.setMandatory(true);//?消息是否從Exchange路由到Queue,?注意:?這是一個失敗回調(diào),?只有消息從Exchange路由到Queue失敗才會回調(diào)這個方法rabbitTemplate.setReturnCallback((message,?replyCode,?replyText,?exchange,?routingKey)?->?{log.info("消息從Exchange路由到Queue失敗:?exchange:?{},?route:?{},?replyCode:?{},?replyText:?{},?message:?{}",?exchange,?routingKey,?replyCode,?replyText,?message);});return?rabbitTemplate;}@Beanpublic?Jackson2JsonMessageConverter?converter()?{return?new?Jackson2JsonMessageConverter();}@Beanpublic?Queue?mailQueue()?{return?new?Queue(MAIL_QUEUE_NAME,?true);}@Beanpublic?DirectExchange?mailExchange()?{return?new?DirectExchange(MAIL_EXCHANGE_NAME,?true,?false);}@Beanpublic?Binding?mailBinding()?{return?BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);} }6.4、Mail 郵件實體類
@Getter @Setter @NoArgsConstructor @AllArgsConstructor public?class?Mail?{@Pattern(regexp?=?"^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$",?message?=?"郵箱格式不正確")private?String?to;@NotBlank(message?=?"標(biāo)題不能為空")private?String?title;@NotBlank(message?=?"正文不能為空")private?String?content;private?String?msgId;//?消息id }6.5、SendMailUtil郵件發(fā)送類
@Component @Slf4j public?class?SendMailUtil?{@Value("${spring.mail.from}")private?String?from;@Autowiredprivate?JavaMailSender?mailSender;/***?發(fā)送簡單郵件**?@param?mail*/public?boolean?send(Mail?mail)?{String?to?=?mail.getTo();//?目標(biāo)郵箱String?title?=?mail.getTitle();//?郵件標(biāo)題String?content?=?mail.getContent();//?郵件正文SimpleMailMessage?message?=?new?SimpleMailMessage();message.setFrom(from);message.setTo(to);message.setSubject(title);message.setText(content);try?{mailSender.send(message);log.info("郵件發(fā)送成功");return?true;}?catch?(MailException?e)?{log.error("郵件發(fā)送失敗,?to:?{},?title:?{}",?to,?title,?e);return?false;}} }6.6、ProduceServiceImpl 生產(chǎn)者類
@Service public?class?ProduceServiceImpl?implements?ProduceService?{@Autowiredprivate?RabbitTemplate?rabbitTemplate;@Overridepublic?boolean?send(Mail?mail)?{//創(chuàng)建uuidString?msgId?=?UUID.randomUUID().toString().replaceAll("-",?"");mail.setMsgId(msgId);//發(fā)送消息到rabbitMQCorrelationData?correlationData?=?new?CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,?RabbitConfig.MAIL_ROUTING_KEY_NAME,?MessageHelper.objToMsg(mail),?correlationData);return?true;} }6.7、ConsumerMailService 消費者類
@Component @Slf4j public?class?ConsumerMailService?{@Autowiredprivate?SendMailUtil?sendMailUtil;@RabbitListener(queues?=?RabbitConfig.MAIL_QUEUE_NAME)public?void?consume(Message?message,?Channel?channel)?throws?IOException?{//將消息轉(zhuǎn)化為對象String?str?=?new?String(message.getBody());Mail?mail?=?JsonUtil.strToObj(str,?Mail.class);log.info("收到消息:?{}",?mail.toString());MessageProperties?properties?=?message.getMessageProperties();long?tag?=?properties.getDeliveryTag();boolean?success?=?sendMailUtil.send(mail);if?(success)?{channel.basicAck(tag,?false);//?消費確認(rèn)}?else?{channel.basicNack(tag,?false,?true);}} }6.8、TestController 控制層類
@RestController @RequestMapping("/test") @Slf4j public?class?TestController?{@Autowiredprivate?ProduceService?testService;@PostMapping("send")public?boolean?sendMail(Mail?mail)?{return?testService.send(mail);} }七、測試服務(wù)
啟動 SpringBoot 服務(wù)之后,用 postman 模擬請求接口。
查看控制臺信息。
查詢接受者郵件信息。
郵件發(fā)送成功!
八、消息發(fā)送失敗處理
雖然,上面案例可以成功的實現(xiàn)消息的發(fā)送,但是上面的流程很脆弱,例如:rabbitMQ 突然蹦了、郵件發(fā)送失敗了、重啟 rabbitMQ 服務(wù)器出現(xiàn)消息重復(fù)消費,應(yīng)該怎處理呢?
很顯然,我們需要對原有的邏輯進(jìn)行升級改造,因此我們需要引入數(shù)據(jù)庫來記錄消息的發(fā)送情況。
8.1、創(chuàng)建消息投遞日志表
CREATE?TABLE?`msg_log`?(`msg_id`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'消息唯一標(biāo)識',`msg`?text?COMMENT?'消息體,?json格式化',`exchange`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'交換機',`routing_key`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'路由鍵',`status`?int(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'狀態(tài):?0投遞中?1投遞成功?2投遞失敗?3已消費',`try_count`?int(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'重試次數(shù)',`next_try_time`?datetime?DEFAULT?NULL?COMMENT?'下一次重試時間',`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時間',`update_time`?datetime?DEFAULT?NULL?COMMENT?'更新時間',PRIMARY?KEY?(`msg_id`),UNIQUE?KEY?`unq_msg_id`?(`msg_id`)?USING?BTREE )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4?COMMENT='消息投遞日志';8.2、編寫 MsgLog 相關(guān)服務(wù)類
public?interface?MsgLogService?{/***?插入消息日志*?@param?msgLog*/void?insert(MsgLog?msgLog);/***?更新消息狀態(tài)*?@param?msgId*?@param?status*/void?updateStatus(String?msgId,?Integer?status);/***?查詢消息*?@param?msgId*?@return*/MsgLog?selectByMsgId(String?msgId); }8.3、改寫服務(wù)邏輯
在生產(chǎn)服務(wù)類中,新增數(shù)據(jù)寫入。
同時,在RabbitConfig服務(wù)配置,當(dāng)消息發(fā)送成功之后,新增更新消息狀態(tài)邏輯。
改造消費者ConsumerMailService,每次消費的時候,從數(shù)據(jù)庫中查詢,如果消息已經(jīng)被消費,不用再重復(fù)發(fā)送數(shù)據(jù)!
這樣即可保證,如果 rabbitMQ 服務(wù)器,即使重啟之后重新推送消息,通過數(shù)據(jù)庫判斷,也不會重復(fù)消費進(jìn)而發(fā)生業(yè)務(wù)異常!
8.4、利用定數(shù)任務(wù)對消息投遞失敗進(jìn)行補償
當(dāng) rabbitMQ 服務(wù)器突然掛掉之后,生成者就無法正常進(jìn)行投遞數(shù)據(jù),此時因為消息已經(jīng)被記錄到數(shù)據(jù)庫,因此我們可以利用定數(shù)任務(wù)查詢出沒有投遞成功的消息,進(jìn)行補償投遞。
利用定數(shù)任務(wù),對投遞失敗的消息進(jìn)行補償投遞,基本可以保證消息 100% 消費成功!
九、總結(jié)
本文主要是通過發(fā)送郵件這個業(yè)務(wù)案例,來講解 Springboot 與 rabbitMQ 技術(shù)的整合和使用!
當(dāng)然解決這個業(yè)務(wù)需求的技術(shù)方案還有很多,例如 Springboot 與 rocketMQ 也可以實現(xiàn)這個需求,這個會在后期的文章講解!
同時,Springboot +?rabbitMQ?這種架構(gòu)方案更適合于集群應(yīng)用,如果是單體應(yīng)用,直接通過服務(wù)類操作即可實現(xiàn)郵件推送!
本篇主要是Springboot 與 rabbitMQ整合和基本使用教程,希望小伙伴能有所收獲!
十、參考
1、簡書 - wangzaiplus - springboot + rabbitmq發(fā)送郵件(保證消息100%投遞成功并被消費)
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號
好文章,我在看??
總結(jié)
以上是生活随笔為你收集整理的利用SpringBoot+RabbitMQ,实现一个邮件推送服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 粉丝福利,送10个程序员专用机械键盘
- 下一篇: 三问Spring事务:解决什么问题?如何