日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)

發布時間:2024/4/15 数据库 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言:

本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis連接池需要依賴commons-pool2包,json包用于對象實例和json字符串的相互轉換

1、jedis的消息隊列方法簡述

1.1、發布消息方法

(其中,channel是對應消息通道,message是對應消息體)

jedis.publish(channel, message);

1.2、監聽消息方法

(其中,jedisPubSub用于處理監聽到的消息,channels是對應的通道

jedis.subscribe(jedisPubSub, channels);

2、發布消息

/*** 從jedis連接池獲取jedis操作實例* @return*/public static Jedis getJedis() {return RedisPoolManager.getJedis();}/*** 推入消息到redis消息通道* * @param String* channel* @param String* message*/public static void publish(String channel, String message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}/*** 推入消息到redis消息通道* * @param byte[]* channel* @param byte[]* message*/public void publish(byte[] channel, byte[] message) {Jedis jedis = null;try {jedis = getJedis();jedis.publish(channel, message);} finally {jedis.close();}}

3、監聽消息

3.1、監聽消息主體方法

/*** 監聽消息通道* @param jedisPubSub - 監聽任務* @param channels - 要監聽的消息通道*/public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}/*** 監聽消息通道* @param jedisPubSub - 監聽任務* @param channels - 要監聽的消息通道*/public static void subscribe(JedisPubSub jedisPubSub, String... channels) {Jedis jedis = null;try {jedis = getJedis();jedis.subscribe(jedisPubSub, channels);} finally {jedis.close();}}


3.2、處理監聽到的消息任務

class Tasker implements Runnable {private String[] channel = null;//監聽的消息通道private JedisPubSub jedisPubSub = null;//消息處理任務public Tasker(JedisPubSub jedisPubSub, String ...channel) {this.jedisPubSub = jedisPubSub;this.channel = channel;}@Overridepublic void run() {// 監聽channel通道的消息RedisMQ.subscribe(jedisPubSub, channel);}}

3.3、處理監聽到的消息主體類實現

package cn.eguid.livePushServer.redisManager;import java.util.Map;import org.json.JSONObject;import cc.eguid.livepush.PushManager; import redis.clients.jedis.JedisPubSub;public class RedisMQHandler extends JedisPubSub{PushManager pushManager = null;public RedisMQHandler(PushManager pushManager) {super();this.pushManager = pushManager;}@Override// 接收到消息后進行分發執行public void onMessage(String channel, String message) {JSONObject jsonObj = new JSONObject(message);System.out.println(channel+","+message);if ("push".equals(channel)) {Map<String,Object> map=jsonObj.toMap();System.out.println("接收到一條推流消息,準備推流:"+map); // String appName=pushManager.push(map);//推流完成后還需要發布一個成功消息到返回隊列} else if ("close".equals(channel)) {String appName=jsonObj.getString("appName");System.out.println("接收到一條關閉消息,準備關閉應用:"+appName); // pushManager.closePush(appName);}} }

4、測試消息隊列發布和監聽

public static void main(String[] args) throws InterruptedException {PushManager pushManager= new PushManagerImpl();Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));t1.start();t2.start();LivePushEntity livePushInfo=new LivePushEntity();livePushInfo.setAppName("test1");JSONObject json=new JSONObject(livePushInfo);publish("push",json.toString());publish("close", json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());Thread.sleep(2000);publish("push", json.toString());publish("close",json.toString());}


轉載于:https://www.cnblogs.com/eguid/p/6821593.html

總結

以上是生活随笔為你收集整理的使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。