Redis发布订阅模式
使用銀行卡消費的時候,銀行往往會通過微信、短信或郵件通知用戶這筆交易的信息,這便是一種發布訂閱模式,這里的發布是交易信息的發布,訂閱則是各個渠道。這在實際工作中十分常用,Redis 支持這樣的一個模式。
發布訂閱模式首先需要消息源,也就是要有消息發布出來,比如例子中的銀行通知。首先是銀行的記賬系統,收到了交易的命令,成功記賬后,它就會把消息發送出來,這個時候,訂閱者就可以收到這個消息進行處理了,觀察者模式就是這個模式的典型應用了。下面用圖描述這樣的一個過程。
這里建立了一個消息渠道,短信系統、郵件系統和微信系統都在監聽這個渠道,一旦記賬系統把交易消息發送到消息渠道,則監聽這個渠道的各個系統就可以拿到這個消息,這樣就能處理各自的任務了。它也有利于系統的拓展,比如現在新增一個彩信平臺,只要讓彩信平臺去監聽這個消息渠道便能得到對應的消息了。
我們可以知道以下兩點:
要有發送的消息渠道,讓記賬系統能夠發送消息。要有訂閱者(短信、郵件、微信等系統)訂閱這個渠道的消息。同樣的,Redis 也是如此。首先來注冊一個訂閱的客戶端,這個時候使用 SUBSCRIBE 命令。
比如監聽一個叫作 chat 的渠道,這個時候我們需要先打開一個客戶端,這里記為客戶端 1,然后輸入命令:
SUBSCRIBE chat這個時候客戶端 1 就會訂閱了一個叫作 chat 渠道的消息了。之后打開另外一個客戶端,記為客戶端 2,輸入命令:
publish chat "let's go!!"這個時候客戶端 2 就向渠道 chat 發送消息:
"let's go!!"我們觀察客戶端 1,就可以發現已經收到了消息,并有對應的信息打印出來。Redis 的發布訂閱過程如下圖所示。
當發布消息的時候,對應的客戶端已經獲取到了這個信息。
下面在 Spring 的工作環境中展示如何配置發布訂閱模式。首先提供接收消息的類,它將實現 org.springframework.data.redis.connection.MessageListener 接口,并實現接口定義的方法 public void onMessage(Message message,byte[]pattern),Redis 發布訂閱監聽類代碼如下所示。
/*** imports ***/ public class RedisMessageListener implements MessageListener {private RedisTemplate redisTemplate;/*** 此處省略redisTemplate的 setter和getter方法 ***/@Overridepublic void onMessage(Message message, byte[] bytes) {// 獲取消息byte[] body = message.getBody();// 使用值序列化器轉換String msgBody = (String) getRedisTemplate().getValueSerializer().deserialize(body);System.err.println(msgBody);// 獲取 channelbyte[] channel = message.getChannel();// 使用字符串序列化器轉換String channelStr = (String) getRedisTemplate().getStringSerializer().deserialize(channel);System.err.println(channelStr);// 渠道名稱轉換String bytesStr = new String(bytes);System.err.println(bytesStr);} }為了在 Spring 中使用這個類,需要對其進行配置。
<bean id="redisMsgListener" class="com.redis.listener.RedisMessageListener"><property name="redisTemplate" ref="redisTemplate"/> </bean>這樣就在 Spring 上下文中定義了監聽類。
有了監聽類還不能進行測試。為了進行測試,要給一個監聽容器,在 Spring 中已有類 org.springframework.data.redis.listener.RedisMessageListenerContainer。它可以用于監聽 Redis 的發布訂閱消息,下面的配置就是為了實現這個功能,大家可以通過注釋來了解它的配置要點。
<bean id="topicContainer"class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy"><!--Redis連接工廠 --><property name="connectionFactory" ref="connectionFactory" /><!--連接池,這里只要線程池生存,才能繼續監聽 --><property name="taskExecutor"><beanclass="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler"><property name="poolSize" value="3" /></bean></property><!--消息監聽Map --><property name="messageListeners"><map><!-- 配置監聽者,key-ref和bean id定義一致 --><entry key-ref="redisMsgListener"><!--監聽類 --><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="chat" /></bean></entry></map></property> </bean>這里配置了線程池,這個線程池將會持續的生存以等待消息傳入,而這里配置了容器用 id 為 redisMsgListener的Bean 進行對渠道 chat 的監聽。當消息通過渠道 chat 發送的時候,就會使用 id 為 redisMsgListener 的 Bean 進行處理消息。
通過以下代碼測試 Redis 發布訂閱。
public static void main(String[] args) {ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");RedisTemplate redisTemplate = applicationContext.getBean(RedisTemplate.class);String channel = "chat";redisTemplate.convertAndSend(channel, "I am lazy!!"); }convertAndSend 方法就是向渠道 chat 發送消息的,當發送后對應的監聽者就能監聽到消息了。運行它,后臺就會打出對應的消息。
總結
以上是生活随笔為你收集整理的Redis发布订阅模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C语言各种类型数据的输出显示
- 下一篇: MySQL选择数据库