日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis实现发布与订阅(转)

發布時間:2025/3/19 数据库 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis实现发布与订阅(转) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介
Redis發布與發布功能(Pub/Sub)是基于事件座位基本的通信機制,是目前應用比較普遍的通信模型,它的目的主要是解除消息的發布者與訂閱者之間的耦合關系。

Redis作為消息發布和訂閱之間的服務器,起到橋梁的作用,在Redis里面有一個channel的概念,也就是頻道,發布者通過指定發布到某個頻道,然后只要有訂閱者訂閱了該頻道,該消息就會發送給訂閱者,原理圖如下所示:


Redis同時也可以使用list類型實現消息隊列(消息隊列的實現以及應用場景會在下一篇文章繼續講解)。

Redis的發布與訂閱的功能應用還是比較廣泛的,它的應用場景有很多。比如:最常見的就是實現實時聊天的功能,還是有就是博客的粉絲文章的推送,當博主推送原創文章的時候,就會將文章實時推送給博主的粉絲。

簡介完Redis的發布于訂閱功能,下面就要來實操一下,包括linux命令的實操和java代碼的實現。

命令實操
這里就假設各位讀者都已經安裝好自己的虛擬機環境和Redis了,若是沒有安裝好的,可以參考這一篇博文:https://www.cnblogs.com/ zuidongfeng/p/8032505.html

我這里是已經安裝好了Redis了,直接啟動我們的Redis,我已經設置好了開機啟動,上面的那篇博文有講解怎么設置開機啟動。

發布消息
Redis中發布消息的命令是publish,具體使用如下所示:

PUBLISH test “haha”:test表示頻道的名稱,haha表示發布的內容,這樣就完成了一個一個消息的發布,后面的返回(integer)0表示0人訂閱。

訂閱頻道
于此同時再啟動一個窗口,這個窗口作為訂閱者,訂閱者的命令subscribe,使用SUBSCRIBE test就表示訂閱了test這個頻道

訂閱后返回的結果中由三條信息,第一個表示類型、第二個表示訂閱的頻道,第三個表示訂閱的數量。接著在第一個窗口進行發布消息:



可以看到發布者發布的消息,訂閱者都會實時的接收到,并發訂閱者收到的信息中也會出現三條信息,分別表示:返回值的類型、頻道名稱、消息內容。

取消訂閱

若是想取消之前的訂閱可以使用unsubscribe命令,格式為:

unsubscribe 頻道名稱 // 取消之前訂閱的test頻道 unsubscribe test

輸入命令后,返回以下結果:

[root@pinyoyougou-docker src]# ./redis-cli
127.0.0.1:6379> UNSUBSCRIBE test

  • “unsubscribe”
  • “test”
  • (integer) 0
  • 它分別表示:返回值的類型、頻道的名稱、該頻道訂閱的數量。

    按模式訂閱

    除了直接以特定的名城進行訂閱,還可以按照模式進行訂閱,模式的方式進行訂閱可以一次訂閱多個頻道,按照模式進行訂閱的命令為psubscribe,具體格式如下:

    psubscribe 模式
    // 表示訂閱名稱以ldc開頭的頻道
    psubscribe ldc*
    輸入上面的命令后,返回如下結果:

    127.0.0.1:6379> PSUBSCRIBE ldc*
    Reading messages… (press Ctrl-C to quit)

  • “psubscribe”
  • “ldc*”
  • (integer) 1
    這個也是非常簡單,分別表示:返回的類型(表示按模式訂閱類型)、訂閱的模式、訂閱數。
  • 取消按模式訂閱

    假如你想取消之前的按模式訂閱,可以使用punsubscribe來取消,具體格式:

    punsubscribe 模式
    // 取消頻道名稱按照ldc開頭的頻道
    punsubscribe ldc*
    他的返回值,如下所示:

    127.0.0.1:6379> PUNSUBSCRIBE ldc*

  • “punsubscribe”
  • “ldc*”
  • (integer) 0
    這個就不多說了,表示的意思和上面的一樣,可以看到上面的命令都是有規律的訂閱SUBSCRIBE,取消就是UNSUBSCRIBE,前面加前綴UN,按模式訂閱也是。
  • 查看訂閱消息

    (1)你想查看某一個模式下訂閱數是大于零的頻道,可以使用如下格式的命令進行操作:

    pubsub channels 模式
    // 查看頻道名稱以ldc模式開頭的訂閱數大于零的頻道
    pubsub channels ldc*
    (2)假如你想查看某一個頻道的訂閱數,可以使用如下命令:

    pubsub numsub 頻道名稱
    (3)查看按照模式的訂閱數,可以使用如下命令進行操作:

    pubsub numpat
    到這里以上的命令操作就基本結束了,下面就來代碼實戰。

    代碼實練

    (1)首先第一步想要操作Redis,再SpringBoot項目中引入jedis的依賴,畢竟jedis是官方推薦使用操作Redis的工具。

    <dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version> </dependency>

    (2)然后創建發布者Publisher,用于消息的發布,具體代碼如下:

    package com.ldc.org.myproject.demo.redis;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool;/*** 發布者* @author liduchang**/ public class Publisher extends Thread{// 連接池 private final JedisPool jedisPool;// 發布頻道名稱private String name;public Publisher(JedisPool jedisPool, String name) {super();this.jedisPool = jedisPool;this.name = name;}@Overridepublic void run() {// 獲取要發布的消息BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));// 獲取連接Jedis resource = jedisPool.getResource();while (true) {String message = null;try {message = reader.readLine();if (!"exit".equals(message)) {// 發布消息resource.publish(name, "發布者:"+Thread.currentThread().getName()+"發布消息:"+message);} else {break;}} catch (IOException e) {e.printStackTrace();}}} }

    (3)接著創建訂閱類Subscriber,并且繼承JedisPubSub 類,重寫onMessage、onSubscribe、onUnsubscribe三個方法,這三個方法的調用時機在注釋上都有說明,具體的實現代碼如下:

    package com.ldc.org.myproject.demo.redis;import com.fasterxml.jackson.core.sym.Name; import redis.clients.jedis.JedisPubSub;/*** 訂閱者* @author liduchang*/ public class Subscriber extends JedisPubSub {//訂閱頻道名稱private String name;public Subscriber(String name) {this.name = name;}/*** 訂閱者收到消息時會調用*/@Overridepublic void onMessage(String channel, String message) {// TODO Auto-generated method stubsuper.onMessage(channel, message);System.out.println("頻道:"+channel+" 接受的消息為:"+message);}/*** 訂閱了頻道會被調用*/@Overridepublic void onSubscribe(String channel, int subscribedChannels) {System.out.println("訂閱了頻道:"+channel+" 訂閱數為:"+subscribedChannels);}/*** 取消訂閱頻道會被調用*/@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {System.out.println("取消訂閱的頻道:"+channel+" 訂閱的頻道數量為:"+subscribedChannels);} }

    (4)這次創建的才是真正的訂閱者SubThread,上面的Subscriber是指為了測試實訂閱的時候或者發布消息,能夠有信息輸出:

    package com.ldc.org.myproject.demo.redis;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;/*** 訂閱者線程* @author liduchang**/public class SubThread extends Thread {private final JedisPool jedisPool;private final Subscriber subscriber;private String name;public SubThread(JedisPool jedisPool,Subscriber subscriber,String name) {super();this.jedisPool = jedisPool;this.subscriber = subscriber;this.name = name;}@Overridepublic void run() {Jedis jedis = null;try {jedis = jedisPool.getResource();// 訂閱頻道為namejedis.subscribe(subscriber, name);} catch (Exception e) {System.err.println("訂閱失敗");e.printStackTrace();} finally {if (jedis!=null) {// jedis.close();//歸還連接到redis池中jedisPool.returnResource(jedis);}}}}

    (5)后面就是測試了,分別測試發布與訂閱的測試,發布者為TestPublisher,訂閱者為

    TestSubscriber:package com.ldc.org.myproject.demo.redis;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import redis.clients.jedis.JedisPool;public class TestPublisher {public static void main(String[] args) throws InterruptedException {JedisPool jedisPool = new JedisPool("192.168.163.155");// 向ldc頻道發布消息Publisher publisher = new Publisher(jedisPool, "ldc");publisher.start();} }

    訂閱者

    package com.ldc.org.myproject.demo.redis;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;import redis.clients.jedis.JedisPool;public class TestSubscriber1 {public static void main(String[] args) throws InterruptedException {JedisPool jedisPool = new JedisPool("192.168.163.155",6379);Subscriber subscriber = new Subscriber("黎杜");// 訂閱ldc頻道SubThread thread= new SubThread(jedisPool, subscriber, "ldc");thread.start();Thread.sleep(600000);// 取消訂閱subscriber.unsubscribe("ldc");} }

    這里為了測試方便就直接創建線程的方式,更好的話可以使用線程池的方式通過線程池的submit方法來執行線程,若是不用了可以使用shutdown方式關閉。

    總結

    以上是生活随笔為你收集整理的Redis实现发布与订阅(转)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。