Redis-13Redis发布订阅
文章目錄
- 概述
- 消息多播
- PubSub發布者訂閱者模型
- 客戶端操作
- Spring配置發布訂閱模式
- pubsub不足之處
- 代碼
概述
當使用銀行卡消費的時候,銀行往往會通過微信、短信或郵件通知用戶這筆交易的信
息,這便是一種發布訂閱模式, 1這里的發布是交易信息的發布,訂閱則是各個渠道。這在實際工作中十分常用, Redis 支持這樣的一個模式。
Redis 發布訂閱(pub/sub)是一種消息通信模式:發送者(pub)發送消息,訂閱者(sub)接收消息。觀察者模式就是這個模式的典型應用。
Redis 客戶端可以訂閱任意數量的頻道。
下圖展示了頻道 channel1 , 以及訂閱這個頻道的三個客戶端 —— client2 、 client5 和 client1 之間的關系:
當有新消息通過 PUBLISH 命令發送給頻道 channel1 時, 這個消息就會被發送給訂閱它的三個客戶端:
消息多播
消息多播允許生產者生產一次消息,中間件負責將消息復制到多個消息隊列,每個消息隊列由相應的消費組進行消費。
它是分布式系統常用的一種解耦方式,用于將多個消費組的邏輯進行拆分。
支持了消息多播,多個消費組的邏輯就可以放到不同的子系統中。
如果是普通的消息隊列,就得將多個不同的消費組邏輯串接起來放在一個子系統中,進行連續消費。
PubSub發布者訂閱者模型
為了支持消息多播,Redis單獨使用了一個模塊來支持消息多播,這個模塊的名字叫著 PubSub,也就是 PublisherSubscriber,發布者訂閱者模型。
客戶端操作
首先來注冊一個訂閱的客戶端 , 這個時候使用 SUBSCRIBE命令 。
比如監昕一個叫作 talk 的渠道 , 這個時候我們需要先打開一個客戶端 ,這里記為客戶
端1 ,然后輸入命令
這個時候客戶端 1 就會訂閱了一個叫作 talk渠道的消息了
打開另外一個客戶端 ,記為客戶端 2訂閱 talk渠道的消息
127.0.0.1:6379> SUBSCRIBE talk Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "talk" 3) (integer) 1最后打開另外一個客戶端,發布消息給這兩個訂閱者
127.0.0.1:6379> PUBLISH talk "redis world !!!" (integer) 2 127.0.0.1:6379>觀察客戶端 1 和客戶端2 ,就可以發現已經收到了消息 , 井有對應的信息打印出來。
Spring配置發布訂閱模式
首先提供接收消息的類 , 它將實現 org.springframework.data.redis.connection.MessageListener 接口, 并實現接口定義的方法 public void onMessage(Message message, byte[] pattern)
package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener:" + channelStr);// 渠道名稱轉換String bytesStr = new String(bytes);System.out.println("RedisMessageListener:" + bytesStr);}} package com.artisan.redis.publish;import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate;public class RedisMessageListener2 implements MessageListener {private RedisTemplate redisTemplate;public RedisTemplate getRedisTemplate() {return redisTemplate;}public void setRedisTemplate(RedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.out.println("RedisMessageListener2:" + msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.out.println("RedisMessageListener2:" + channelStr);// 渠道名稱轉換String bytesStr = new String(bytes);System.out.println("RedisMessageListener2:" + bytesStr);}}為了在 Spring 中使用這兩個監聽類,需要對其進行配置。這樣就在 Spring 上下文中定義了監昕類。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><context:property-placeholder location="classpath:redis/redis.properties" /><!--2,注意新版本2.3以后,JedisPoolConfig的property name,不是maxActive而是maxTotal,而且沒有maxWait屬性,建議看一下Jedis源碼或百度。 --><!-- redis連接池配置 --><bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"><!--最大空閑數 --><property name="maxIdle" value="${redis.maxIdle}" /><!--連接池的最大數據庫連接數 --><property name="maxTotal" value="${redis.maxTotal}" /><!--最大建立連接等待時間 --><property name="maxWaitMillis" value="${redis.maxWaitMillis}" /><!--逐出連接的最小空閑時間 默認1800000毫秒(30分鐘) --><property name="minEvictableIdleTimeMillis" value="${redis.minEvictableIdleTimeMillis}" /><!--每次逐出檢查時 逐出的最大數目 如果為負數就是 : 1/abs(n), 默認3 --><property name="numTestsPerEvictionRun" value="${redis.numTestsPerEvictionRun}" /><!--逐出掃描的時間間隔(毫秒) 如果為負數,則不運行逐出線程, 默認-1 --><property name="timeBetweenEvictionRunsMillis" value="${redis.timeBetweenEvictionRunsMillis}" /><property name="testOnBorrow" value="true"></property><property name="testOnReturn" value="true"></property><property name="testWhileIdle" value="true"></property></bean><!--redis連接工廠 --><bean id="jedisConnectionFactory"class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"destroy-method="destroy"><property name="poolConfig" ref="jedisPoolConfig"></property><!--IP地址 --><property name="hostName" value="${redis.host.ip}"></property><!--端口號 --><property name="port" value="${redis.port}"></property><!--如果Redis設置有密碼 --><property name="password" value="${redis.password}" /> <!--客戶端超時時間單位是毫秒 --><property name="timeout" value="${redis.timeout}"></property><property name="usePool" value="true" /><!--<property name="database" value="0" /> --></bean><!-- 鍵值序列化器設置為String 類型 --><bean id="stringRedisSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/><!-- redis template definition --><bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"p:connection-factory-ref="jedisConnectionFactory"p:keySerializer-ref="stringRedisSerializer"p:valueSerializer-ref="stringRedisSerializer"></bean><!-- 自定義 發布訂閱監聽類 --><bean id="redisMessageListener" class="com.artisan.redis.publish.RedisMessageListener"p:redisTemplate-ref="redisTemplate"/><bean id="redisMessageListener2" class="com.artisan.redis.publish.RedisMessageListener2"p:redisTemplate-ref="redisTemplate"/> <!-- 監聽容器 --> <bean id="topicContainer"class="org.springframework.data.redis.listener.RedisMessageListenerContainer"destroy-method="destroy"><!--Redis 連接工廠 --><property name="connectionFactory" ref="jedisConnectionFactory"></property><!-- 連接池,這里只要線程池生存 , 才能繼續監昕 --><property name="taskExecutor"><beanclass="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><property name="corePoolSize" value="3"></property></bean></property><!-- 消息監聽 Map --><property name="messageListeners"><map><!--一配置監聽者, key-ref 和 bean id 定義一致 --><entry key-ref="redisMessageListener"><!--監聽類 --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry><entry key-ref="redisMessageListener2"><!--監聽類 --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="talk" /></bean></entry></map></property></bean> </beans>有了監聽類還不能進行測試。為了進行測試 , 要給一個監昕容器 , 在 Spring 中己有類org.springframework.data . redi s. li stener.RedisMessageListenerContainer。它可 以用于監聽 Redis的發布訂閱消息,上面配置的topicContainer就是為了實現這個功能。
這里配置了線程池,這個線程池將會持續的生存 以等待消息傳入 , 而這里配置了容器用id 為 redisMessageListener 和 redisMessageListener2的 Bean 進行對渠道 talk的監聽 。當消息通過渠道 talk發送的時候,就會使用 id 為 redisMessageListener和redisMessageListener2 的 Bean 進行處理消息。
測試類
package com.artisan.redis.publish;import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.data.redis.core.RedisTemplate;public class PublishSubscribeTest {public static void main(String[] args) {ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring/spring-redis-publish.xml");RedisTemplate redisTemplate = ctx.getBean(RedisTemplate.class);String channel = "talk";redisTemplate.convertAndSend(channel, "artisan-talk");} }convertAndSend 方法就是向渠道 talk發送消息的, 當發送后對應的監聽者就能監聽到消息了。運行它,后臺就會打出對應的消息:
INFO : org.springframework.context.support.ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@73a8dfcc: startup date [Thu Sep 27 23:55:12 CST 2018]; root of context hierarchy INFO : org.springframework.beans.factory.xml.XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [spring/spring-redis-publish.xml] INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647 RedisMessageListener:artisan-talk RedisMessageListener2:artisan-talk RedisMessageListener2:talk RedisMessageListener:talk RedisMessageListener:talk RedisMessageListener2:talk客戶端中肯定也有對應的輸出,如果打開了客戶端的話
pubsub不足之處
PubSub 的生產者傳遞過來一個消息,Redis 會直接找到相應的消費者傳遞過去。如果一個消費者都沒有,那么消息直接丟棄。
如果開始有三個消費者,一個消費者突然掛掉了,生產者會繼續發送消息,另外兩個消費者可以持續收到消息。但是掛掉的消費者重新連上的時候,這斷連期間生產者發送的消息,對于這個消費者來說就是徹底丟失了。
如果 Redis 停機重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機就相當于一個消費者都沒有,所有的消息直接被丟棄。
正是因為 PubSub 有這些缺點,它幾乎找不到合適的應用場景。Redis5.0 新增了 Stream 數據結構,這個功能給 Redis 帶來了持久化消息隊列,從此 PubSub 可以消失了。
代碼
代碼托管到了 https://github.com/yangshangwei/redis_learn
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Redis-13Redis发布订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redis-12Redis 流水线( p
- 下一篇: Redis-15Redis基础配置文件