Redis实现发布与订阅(转)
簡介
Redis發布與發布功能(Pub/Sub)是基于事件座位基本的通信機制,是目前應用比較普遍的通信模型,它的目的主要是解除消息的發布者與訂閱者之間的耦合關系。
Redis作為消息發布和訂閱之間的服務器,起到橋梁的作用,在Redis里面有一個channel的概念,也就是頻道,發布者通過指定發布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發送給訂閱者,原理圖如下所示:
Redis同時也可以使用list類型實現消息隊列(消息隊列的實現以及應用場景會在下一篇文章繼續講解)。
Redis的發布與訂閱的功能應用還是比較廣泛的,它的應用場景有很多。比如:最常見的就是實現實時聊天的功能,還是有就是博客的粉絲文章的推送,當博主推送原創文章的時候,就會將文章實時推送給博主的粉絲。
簡介完Redis的發布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現。
命令實操
這里就假設各位讀者都已經安裝好自己的虛擬機環境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/ zuidongfeng/p/8032505.html
我這里是已經安裝好了Redis了,直接啟動我們的Redis,我已經設置好了開機啟動,上面的那篇博文有講解怎么設置開機啟動。
發布消息
Redis中發布消息的命令是publish,具體使用如下所示:
PUBLISH test “haha”:test表示頻道的名稱,haha表示發布的內容,這樣就完成了一個一個消息的發布,后面的返回(integer)0表示0人訂閱。
訂閱頻道
于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe,使用SUBSCRIBE test就表示訂閱了test這個頻道
訂閱后返回的結果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數量。接著在第一個窗口進行發布消息:
可以看到發布者發布的消息,訂閱者都會實時的接收到,并發訂閱者收到的信息中也會出現三條信息,分別表示:返回值的類型、頻道名稱、消息內容。
取消訂閱
若是想取消之前的訂閱可以使用unsubscribe命令,格式為:
unsubscribe 頻道名稱 // 取消之前訂閱的test頻道 unsubscribe test輸入命令后,返回以下結果:
[root@pinyoyougou-docker src]# ./redis-cli
127.0.0.1:6379> UNSUBSCRIBE test
它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數量。
按模式訂閱
除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe,具體格式如下:
psubscribe 模式
// 表示訂閱名稱以ldc開頭的頻道
psubscribe ldc*
輸入上面的命令后,返回如下結果:
127.0.0.1:6379> PSUBSCRIBE ldc*
Reading messages… (press Ctrl-C to quit)
這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數。
取消按模式訂閱
假如你想取消之前的按模式訂閱,可以使用punsubscribe來取消,具體格式:
punsubscribe 模式
// 取消頻道名稱按照ldc開頭的頻道
punsubscribe ldc*
他的返回值,如下所示:
127.0.0.1:6379> PUNSUBSCRIBE ldc*
這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。
查看訂閱消息
(1)你想查看某一個模式下訂閱數是大于零的頻道,可以使用如下格式的命令進行操作:
pubsub channels 模式
// 查看頻道名稱以ldc模式開頭的訂閱數大于零的頻道
pubsub channels ldc*
(2)假如你想查看某一個頻道的訂閱數,可以使用如下命令:
pubsub numsub 頻道名稱
(3)查看按照模式的訂閱數,可以使用如下命令進行操作:
pubsub numpat
到這里以上的命令操作就基本結束了,下面就來代碼實戰。
代碼實練
(1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency>(2)然后創建發布者Publisher,用于消息的發布,具體代碼如下:
package com.ldc.org.myproject.demo.redis;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool;/*** 發布者* @author liduchang**/ public class Publisher extends Thread{// 連接池 private final JedisPool jedisPool;// 發布頻道名稱private String name;public Publisher(JedisPool jedisPool, String name) {super();this.jedisPool = jedisPool;this.name = name;}@Overridepublic void run() {// 獲取要發布的消息BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));// 獲取連接Jedis resource = jedisPool.getResource();while (true) {String message = null;try {message = reader.readLine();if (!"exit".equals(message)) {// 發布消息resource.publish(name, "發布者:"+Thread.currentThread().getName()+"發布消息:"+message);} else {break;}} catch (IOException e) {e.printStackTrace();}}} }(3)接著創建訂閱類Subscriber,并且繼承JedisPubSub 類,重寫onMessage、onSubscribe、onUnsubscribe三個方法,這三個方法的調用時機在注釋上都有說明,具體的實現代碼如下:
package com.ldc.org.myproject.demo.redis;import com.fasterxml.jackson.core.sym.Name; import redis.clients.jedis.JedisPubSub;/*** 訂閱者* @author liduchang*/ public class Subscriber extends JedisPubSub {//訂閱頻道名稱private String name;public Subscriber(String name) {this.name = name;}/*** 訂閱者收到消息時會調用*/@Overridepublic void onMessage(String channel, String message) {// TODO Auto-generated method stubsuper.onMessage(channel, message);System.out.println("頻道:"+channel+" 接受的消息為:"+message);}/*** 訂閱了頻道會被調用*/@Overridepublic void onSubscribe(String channel, int subscribedChannels) {System.out.println("訂閱了頻道:"+channel+" 訂閱數為:"+subscribedChannels);}/*** 取消訂閱頻道會被調用*/@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {System.out.println("取消訂閱的頻道:"+channel+" 訂閱的頻道數量為:"+subscribedChannels);} }(4)這次創建的才是真正的訂閱者SubThread,上面的Subscriber是指為了測試實訂閱的時候或者發布消息,能夠有信息輸出:
package com.ldc.org.myproject.demo.redis;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/*** 訂閱者線程* @author liduchang**/public class SubThread extends Thread {private final JedisPool jedisPool;private final Subscriber subscriber;private String name;public SubThread(JedisPool jedisPool,Subscriber subscriber,String name) {super();this.jedisPool = jedisPool;this.subscriber = subscriber;this.name = name;}@Overridepublic void run() {Jedis jedis = null;try {jedis = jedisPool.getResource();// 訂閱頻道為namejedis.subscribe(subscriber, name);} catch (Exception e) {System.err.println("訂閱失敗");e.printStackTrace();} finally {if (jedis!=null) {// jedis.close();//歸還連接到redis池中jedisPool.returnResource(jedis);}}}}(5)后面就是測試了,分別測試發布與訂閱的測試,發布者為TestPublisher,訂閱者為
TestSubscriber:package com.ldc.org.myproject.demo.redis;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import redis.clients.jedis.JedisPool;public class TestPublisher {public static void main(String[] args) throws InterruptedException {JedisPool jedisPool = new JedisPool("192.168.163.155");// 向ldc頻道發布消息Publisher publisher = new Publisher(jedisPool, "ldc");publisher.start();} }訂閱者
package com.ldc.org.myproject.demo.redis;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;import redis.clients.jedis.JedisPool;public class TestSubscriber1 {public static void main(String[] args) throws InterruptedException {JedisPool jedisPool = new JedisPool("192.168.163.155",6379);Subscriber subscriber = new Subscriber("黎杜");// 訂閱ldc頻道SubThread thread= new SubThread(jedisPool, subscriber, "ldc");thread.start();Thread.sleep(600000);// 取消訂閱subscriber.unsubscribe("ldc");} }這里為了測試方便就直接創建線程的方式,更好的話可以使用線程池的方式通過線程池的submit方法來執行線程,若是不用了可以使用shutdown方式關閉。
總結
以上是生活随笔為你收集整理的Redis实现发布与订阅(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 缓存与库先删哪个(转自网络,侵删)
- 下一篇: mysql 下载地址及安装教程