javascript
SpringBoot 整合 Redis 实现消息队列
寫這篇文章的原因還是得歸咎于👉 上一篇博客寫了👉Docker搭建Redis Cluster 集群環境
我自己是認為對于每個知識點,光看了不操作是沒有用的(遺忘太快…),多少得在手上用上幾回才可以,才能對它加深印象。
昨天搭建了Redis Cluster 集群環境,今天就來拿它玩一玩Redis 消息隊列吧
于是便有了這個Redis 實現消息隊列的Demo,
很喜歡一句話:”八小時內謀生活,八小時外謀發展“。
共勉.😁
Docker搭建Redis集群
SpringBoot 整合 Redis 實現消息隊列
- 一、前言
- 概念
- 作用:
- 應用場景:
- 二、前期準備
- 2.1、項目結構
- 2.2、依賴的jar包
- 2.3、yml配置文件
- 三、編碼
- 3.1、config層
- 3.2、信息實體類
- 3.3、MyThread類
- 3.4、消費者
- 3.5、生產者
- 四、測試
- 五、自言自語
一、前言
概念
消息隊列:“消息隊列”是在消息的傳輸過程中保存消息的容器。
其實就是個 生產者--->消息隊列<---消費者 的模型。集群就是蠻多蠻多而已。
作用:
主要解決應用耦合,異步消息,流量削鋒等問題
應用場景:
異步處理,應用解耦(拆分多系統),流量削峰(秒殺活動、請求量過大)和消息通訊(發布公告、日志)四個場景。
此處只演示了最簡單的一個圖哈。
舉例子:異步消息
使用消息隊列后
消息中間件其實市面上已經有很多,如RabbitMq,RocketMq、ActiveMq、Kafka等,我拿Redis來做消息隊列,其本意是1)為了熟悉Redis;2)Redis 確實可以來做簡單的消息隊列(狗頭保命)
二、前期準備
就是需要個Redis,其他的倒是沒啥特殊的啦。😁
2.1、項目結構
一普通的SpringBoot的項目…😊
2.2、依賴的jar包
jar 也都是一些正常的jar包哈,沒啥新奇玩意。😜
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.2</version><relativePath/> <!-- lookup parent from repository --> </parent> <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>2.3、yml配置文件
分單機和集群,主要是上一篇文章帶的…🙄😶
單機配置文件
spring:redis:database: 0port: 6379host: localhostpassword:lettuce:pool:# 連接池最大連接數(使用負值表示沒有限制)max-active: 1024# 連接池最大阻塞等待時間(使用負值表示沒有限制)max-wait: 10000# 連接池中的最大空閑連接max-idle: 200# 連接池中的最小空閑連接min-idle: 0# 連接超時時間(毫秒)timeout: 10000redis集群配置文件
server:port: 8089 spring:application:name: springboot-redisredis:password: 1234cluster:nodes:- IP地址:6379- IP地址:6380- IP地址:6381- IP地址:6382- IP地址:6383- IP地址:6384max-redirects: 3 # 獲取失敗 最大重定向次數lettuce:pool:max-active: 1000 #連接池最大連接數(使用負值表示沒有限制)max-idle: 10 # 連接池中的最大空閑連接min-idle: 5 # 連接池中的最小空閑連接#===========jedis配置方式============================================= # jedis: # pool: # max-active: 1000 # 連接池最大連接數(使用負值表示沒有限制) # max-wait: -1ms # 連接池最大阻塞等待時間(使用負值表示沒有限制) # max-idle: 10 # 連接池中的最大空閑連接 # min-idle: 5 # 連接池中的最小空閑連接 #三、編碼
3.1、config層
沒有什么特殊的配置,🤗
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;/*** redis 配置類* 1. 設置RedisTemplate序列化/返序列化** @author cuberxp* @since 1.0.0* Create time 2020/1/23 0:06*/ @Configuration @ConditionalOnClass(RedisOperations.class) @EnableConfigurationProperties(RedisProperties.class) public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//設置value hashValue值的序列化Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);serializer.setObjectMapper(om);redisTemplate.setValueSerializer(serializer);redisTemplate.setHashValueSerializer(serializer);//key hasKey的序列化redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.afterPropertiesSet();return redisTemplate;} }3.2、信息實體類
加個實體類,模擬傳遞信息中需要用到的實體類。
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;/*** @author crush*/ @Data @AllArgsConstructor @NoArgsConstructor public class AnnouncementMessage implements Serializable {private static final long serialVersionUID = 8632296967087444509L;private String id;/*** 內容 */private String content; }3.3、MyThread類
隨項目啟動而啟動。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;/*** @Author: crush* @Date: 2021-08-06 22:17* version 1.0* ApplicationRunner:* 用于指示 bean 在包含在SpringApplication時應該運行的SpringApplication 。 * 通俗說就是 在這個項目運行的時候,它也會自動運行起來。*/ @Component public class MyThread implements ApplicationRunner {@AutowiredMessageConsumerService messageConsumerService;@Overridepublic void run(ApplicationArguments args) throws Exception {messageConsumerService.start();} }3.4、消費者
import java.util.concurrent.TimeUnit; import com.crush.queue.entity.AnnouncementMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;/*** ApplicationRunner 實現這個接口可以跟隨項目啟動而啟動* @author crush*/ @Service public class MessageConsumerService extends Thread {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;private volatile boolean flag = true;private String queueKey="queue";private Long popTime=1000L;@Overridepublic void run() {try {AnnouncementMessage message;// 為了能一直循環而不結束while(flag && !Thread.currentThread().isInterrupted()) {message = (AnnouncementMessage) redisTemplate.opsForList().rightPop(queueKey,popTime,TimeUnit.SECONDS);System.out.println("接收到了" + message);}} catch (Exception e) {System.err.println(e.getMessage());}}public boolean isFlag() {return flag;}public void setFlag(boolean flag) {this.flag = flag;}}3.5、生產者
import com.crush.queue.entity.AnnouncementMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;@Service public class MessageProducerService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;private String queueKey="queue";public Long sendMeassage(AnnouncementMessage message) {System.out.println("發送了" + message);return redisTemplate.opsForList().leftPush(queueKey, message);}}四、測試
就是簡單寫了一個測試代碼。😝
import com.crush.queue.entity.AnnouncementMessage; import com.crush.queue.service.MessageConsumerService; import com.crush.queue.service.MessageProducerService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /*** @Author: crush* @Date: 2021-08-06 17:11* version 1.0*/ @SpringBootTest public class MessageQueueTest {@Autowiredprivate MessageProducerService producer;@Autowiredprivate MessageConsumerService consumer;/*** 這個測時 的先啟動主啟動累,* 然后消費者可以一直在監聽。*/@Testpublic void testQueue2() {producer.sendMeassage(new AnnouncementMessage("1", "aaaa"));producer.sendMeassage(new AnnouncementMessage("2", "bbbb"));try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}} }注:這只是一個小demo ,很多細節都沒有去考慮,只是一次對Redis做消息隊列的初探,大家見諒。
五、自言自語
一次由搭建Redis Cluster集群開啟的博客,終于結束了,算了好像還沒,感覺下次可以多寫點實用的。😂🤣
不知道大家學習是什么樣的,博主自己的感覺就是學了的東西,要通過自己去梳理一遍,或者說是去實踐一遍,我覺得這樣子,無論是對于理解還是記憶,都會更加深刻。
如若有不足之處,請不嗇賜教!!😁
有疑惑之處,也可以留言或私信,定會第一時間回復。👩?💻
這篇文章就到這里啦,下篇文章再見。👉一篇文章用Redis 實現消息隊列(還在寫)
總結
以上是生活随笔為你收集整理的SpringBoot 整合 Redis 实现消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 史上最详细Docker搭建Redis C
- 下一篇: Spring Boot 使用Actuat