日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用rabbitMQ实现数据同步

發布時間:2024/4/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用rabbitMQ实现数据同步 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

思路分析

發送方:商品微服務

  • 什么時候發?

    當商品服務對商品進行寫操作:增、刪、改的時候,需要發送一條消息,通知其它服務。

  • 發送什么內容?

    對商品的增刪改時其它服務可能需要新的商品數據,但是如果消息內容中包含全部商品信息,數據量太大,而且并不是每個服務都需要全部的信息。因此我們只發送商品id,其它服務可以根據id查詢自己需要的信息。

接收方:搜索微服務、靜態頁微服務

接收消息后如何處理?

  • 搜索微服務:

    • 增/改:添加新的數據到索引庫

    • 刪:刪除索引庫數據

  • 靜態頁微服務:

    • 增/改:創建新的靜態頁

    • 刪:刪除原來的靜態頁

商品服務發送消息

我們先在商品微服務learn-item-service中實現發送消息。

引入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

配置文件

我們在application.yml中添加一些有關RabbitMQ的配置:

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learntemplate:exchange: learn.item.exchangepublisher-confirms: true
  • template:有關AmqpTemplate的配置

    • exchange:缺省的交換機名稱,此處配置后,發送消息如果不指定交換機就會使用這個

  • publisher-confirms:生產者確認機制,確保消息會正確發送,如果發送失敗會有錯誤回執,從而觸發重試

改造GoodsService

在GoodsService中封裝一個發送消息到mq的方法:(需要注入AmqpTemplate模板)

private void sendMessage(Long id, String type){// 發送消息try {this.amqpTemplate.convertAndSend("item." + type, id);} catch (Exception e) {logger.error("{}商品消息發送異常,商品id:{}", type, id, e);} }

這里沒有指定交換機,因此默認發送到了配置中的:leyou.item.exchange

注意:這里要把所有異常都try起來,不能讓消息的發送影響到正常的業務邏輯

?

然后在新增的時候調用:

修改的時候調用:

搜索服務接收消息

搜索服務接收到消息后要做的事情:

  • 增:添加新的數據到索引庫

  • 刪:刪除索引庫數據

  • 改:修改索引庫數據

因為索引庫的新增和修改方法是合二為一的,因此我們可以將這兩類消息一同處理,刪除另外處理。

引入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

添加配置

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn

這里只是接收消息而不發送,所以不用配置template相關內容。

編寫監聽器

代碼:

@Component public class GoodsListener {@Autowiredprivate SearchService searchService;/*** 處理insert和update的消息** @param id* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創建或更新索引this.searchService.createIndex(id);}/*** 處理delete的消息** @param id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.index.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除索引this.searchService.deleteIndex(id);} }

編寫創建和刪除索引方法

這里因為要創建和刪除索引,我們需要在SearchService中拓展兩個方法,創建和刪除索引:

public void createIndex(Long id) throws IOException {Spu spu = this.goodsClient.querySpuById(id);// 構建商品Goods goods = this.buildGoods(spu);// 保存數據到索引庫this.goodsRepository.save(goods); }public void deleteIndex(Long id) {this.goodsRepository.deleteById(id); }

創建索引的方法可以從之前導入數據的測試類中拷貝和改造。

?

靜態頁服務接收消息

商品靜態頁服務接收到消息后的處理:

  • 增:創建新的靜態頁

  • 刪:刪除原來的靜態頁

  • 改:創建新的靜態頁并覆蓋原來的

不過,我們編寫的創建靜態頁的方法也具備覆蓋以前頁面的功能,因此:增和改的消息可以放在一個方法中處理,刪除消息放在另一個方法處理。

引入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

添加配置

spring:rabbitmq:host: 192.168.56.101username: learnpassword: learnvirtual-host: /learn

這里只是接收消息而不發送,所以不用配置template相關內容。

?

編寫監聽器

代碼:

@Component public class GoodsListener {@Autowiredprivate GoodsHtmlService goodsHtmlService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.create.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"item.insert", "item.update"}))public void listenCreate(Long id) throws Exception {if (id == null) {return;}// 創建頁面goodsHtmlService.createHtml(id);}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "learn.delete.web.queue", durable = "true"),exchange = @Exchange(value = "learn.item.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = "item.delete"))public void listenDelete(Long id) {if (id == null) {return;}// 刪除頁面goodsHtmlService.deleteHtml(id);} }

添加刪除頁面方法

public void deleteHtml(Long id) {File file = new File("C:\\project\\nginx-1.14.0\\html\\item\\", id + ".html");file.deleteOnExit(); }

測試

3.5.1.查看RabbitMQ控制臺

重新啟動項目,并且登錄RabbitMQ管理界面:http://192.168.56.101:15672

可以看到,交換機已經創建出來了:

隊列也已經創建完畢:

并且隊列都已經綁定到交換機:

總結

以上是生活随笔為你收集整理的使用rabbitMQ实现数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。