日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

redis之mq实现发布订阅模式

發布時間:2025/3/21 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis之mq实现发布订阅模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


https://github.com/smltq/spring-boot-demo/blob/master/mq-redis

概述
Redis不僅可作為緩存服務器,還可用作消息隊列,本示例演示如何使用redis實現發布/訂閱消息隊列。

在Redis中,發布者沒有將消息發送給特定訂閱者的程序。相反,發布的消息被描述為通道,而不知道(如果有的話)可能有哪些訂閱者。
訂閱者表示對一個或多個主題感興趣,只接收感興趣的消息,而不知道(如果有的話)發布者是什么。
發布者和訂閱者的這種解耦可以實現更大的可伸縮性和更動態的網絡拓撲。
代碼實現
redis實現mq的存儲方式很多,可以使用list,zset及stream,這些數據的存儲結構決定了怎么消費問題(消息是一次使用、允許多次使用、允許多端消息等),比如使用list,我們可以使用leftPush插入消息,使用rightPop消費消息,實現一條消息一次消息,可以參考與以示例代碼:

@Test
public void testMq() {
for (int i = 0; i < 10; i++) {
redisTemplate.opsForList().leftPush(“task-queue”, “data” + i);
log.info(“插入了一個新的任務==>{}”, “data” + i);
}
String taskId = redisTemplate.opsForList().rightPop(“task-queue”).toString();
log.info(“處理成功,清除任務==>{}”, taskId);
}

1.配置代碼RedisConfig.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

@Autowired private RedisTemplate redisTemplate;/*** redisTemplate 序列化使用的jdkSerializeable, 存儲二進制字節碼, 所以自定義序列化類,方便調試redis** @param redisConnectionFactory* @return*/ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();//使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());//使用StringRedisSerializer來序列化和反序列化redis的keredisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());//開啟事務redisTemplate.setEnableTransactionSupport(true);redisTemplate.setConnectionFactory(redisConnectionFactory);return redisTemplate; }@Bean MessageListenerAdapter messageListener() {return new MessageListenerAdapter(new RedisMessageSubscriber()); }@Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, topic());return container; }@Bean MessagePublisher redisPublisher() {return new RedisMessagePublisher(redisTemplate, topic()); }@Bean ChannelTopic topic() {return new ChannelTopic("messageQueue"); }

}

2.定義消息發布接口MessagePublisher.java

package demo.data.mqRedis.config;

public interface MessagePublisher {
void publish(String message);
}

3.發布方實現RedisMessagePublisher.java

package demo.data.mqRedis.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**

  • 消息發布方
    */
    public class RedisMessagePublisher implements MessagePublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ChannelTopic topic;

    public RedisMessagePublisher(
    RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
    this.redisTemplate = redisTemplate;
    this.topic = topic;
    }

    public void publish(String message) {
    redisTemplate.convertAndSend(topic.getTopic(), message);
    }
    }
    4.消息接收方RedisMessageSubscriber.java
    package demo.data.mqRedis.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**

  • 消息訂閱方
    */
    @Service
    @Slf4j
    public class RedisMessageSubscriber implements MessageListener {

    public static List messageList = new ArrayList<>();

    public void onMessage(Message message, byte[] pattern) {
    messageList.add(message.toString());
    log.info(“訂閱方接收到了消息==>{}”, message.toString());
    }
    }

5.最后貼上application.yml配置

spring:
redis:
host: 127.0.0.1
port: 6379
password:

查看運行結果
1.編寫測試用例試發布消息TestRedisMQ.java

package demo.data.mqRedis;

import demo.data.mqRedis.config.RedisMessagePublisher;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class TestRedisMQ {

@Autowired RedisMessagePublisher redisMessagePublisher;@Test public void testMq() {String message = "Message " + UUID.randomUUID();redisMessagePublisher.publish(message); }

}

2.運行結果
2019-09-05 15:51:33.931 INFO 10772 — [ container-2] d.d.m.config.RedisMessageSubscriber : 訂閱方接收到了消息==>“Message c95959bf-6c30-4801-bc80-0e1e3c9f81bc”
訂閱方成功接收到消息了

總結

以上是生活随笔為你收集整理的redis之mq实现发布订阅模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。