日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ发布订阅实战-实现延时重试队列

發布時間:2025/6/17 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ发布订阅实战-实现延时重试队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的?RabbitMQ Tutorials?入門教程開始學習。

本文將會講解如何使用RabbitMQ實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。在這里我會帶領大家一步一步的實現一個帶有失敗重試功能的發布訂閱組件,使用該組件后可以非常簡單的實現消息的發布訂閱,在進行業務開發的時候,業務開發人員可以將主要精力放在業務邏輯實現上,而不需要花費時間去理解RabbitMQ的一些復雜概念。

本文將會持續修正和更新,最新內容請參考我的?GITHUB?上的?程序猿成長計劃?項目,歡迎 Star,更多精彩內容請?follow me。

概要

我們將會實現如下功能

  • 結合RabbitMQ的Topic模式和Work Queue模式實現生產方產生消息,消費方按需訂閱,消息投遞到消費方的隊列之后,多個worker同時對消息進行消費
  • 結合RabbitMQ的?Message TTL?和?Dead Letter Exchange?實現消息的延時重試功能
  • 消息達到最大重試次數之后,將其投遞到失敗隊列,等待人工介入處理bug后,重新將其加入隊列消費

具體流程見下圖

  • 生產者發布消息到主Exchange
  • 主Exchange根據Routing Key將消息分發到對應的消息隊列
  • 多個消費者的worker進程同時對隊列中的消息進行消費,因此它們之間采用“競爭”的方式來爭取消息的消費
  • 消息消費后,不管成功失敗,都要返回ACK消費確認消息給隊列,避免消息消費確認機制導致重復投遞,同時,如果消息處理成功,則結束流程,否則進入重試階段
  • 如果重試次數小于設定的最大重試次數(3次),則將消息重新投遞到Retry Exchange的重試隊列
  • 重試隊列不需要消費者直接訂閱,它會等待消息的有效時間過期之后,重新將消息投遞給Dead Letter Exchange,我們在這里將其設置為主Exchange,實現延時后重新投遞消息,這樣消費者就可以重新消費消息
  • 如果三次以上都是消費失敗,則認為消息無法被處理,直接將消息投遞給Failed Exchange的Failed Queue,這時候應用可以觸發報警機制,以通知相關責任人處理
  • 等待人工介入處理(解決bug)之后,重新將消息投遞到主Exchange,這樣就可以重新消費了
  • 技術實現

    Linus Torvalds?曾經說過

    Talk is cheap. Show me the code

    我分別用Java和PHP實現了本文所講述的方案,讀者可以通過參考代碼以及本文中的基本步驟來更好的理解

    • rabbitmq-pubsub-php
    • rabbitmq-pubsub-java

    創建Exchange

    為了實現消息的延時重試和失敗存儲,我們需要創建三個Exchange來處理消息。

    • master?主Exchange,發布消息時發布到該Exchange
    • master.retry?重試Exchange,消息處理失敗時(3次以內),將消息重新投遞給該Exchange
    • master.failed?失敗Exchange,超過三次重試失敗后,消息投遞到該Exchange

    所有的Exchange聲明(declare)必須使用以下參數

    參數值說明
    exchange-Exchange名稱
    typetopicExchange 類型
    passivefalse如果Exchange已經存在,則返回成功,不存在則創建
    durabletrue持久化存儲Exchange,這里僅僅是Exchange本身持久化,消息和隊列需要單獨指定其持久化
    no-waitfalse該方法需要應答確認

    Java代碼

    // 聲明Exchange:主體,失敗,重試 channel.exchangeDeclare("master", "topic", true); channel.exchangeDeclare("master.retry", "topic", true); channel.exchangeDeclare("master.failed", "topic", true);

    PHP代碼

    // 普通交換機 $this->channel->exchange_declare('master', 'topic', false, true, false); // 重試交換機 $this->channel->exchange_declare('master.retry', 'topic', false, true, false); // 失敗交換機 $this->channel->exchange_declare('master.failed', 'topic', false, true, false);

    在RabbitMQ的管理界面中,我們可以看到創建的三個Exchange

    消息發布

    消息發布時,使用basic_publish方法,參數如下

    參數值說明
    message-發布的消息對象
    exchangemaster消息發布到的Exchange
    routing-key-路由KEY,用于標識消息類型
    mandatoryfalse是否強制路由,指定了該選項后,如果沒有訂閱該消息,則會返回路由不可達錯誤
    immediatefalse指定了當消息無法直接路由給消費者時如何處理

    發布消息時,對于message對象,其內容建議使用json編碼后的字符串,同時消息需要標識以下屬性

    'delivery_mode'=> 2 // 1為非持久化,2為持久化

    Java代碼

    channel.basicPublish("master", routingKey, MessageProperties.PERSISTENT_BASIC, // delivery_modemessage.getBytes() );

    PHP代碼

    $msg = new AMQPMessage($message->serialize(), ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]);$this->channel->basic_publish($msg, 'master', $routingKey);

    消息訂閱

    消息訂閱的實現相對復雜一些,需要完成隊列的聲明以及隊列和Exchange的綁定。

    Declare Queue

    對于每一個訂閱消息的服務,都必須創建一個該服務對應的隊列,將該隊列綁定到關注的路由規則,這樣之后,消息生產者將消息投遞給Exchange之后,就會按照路由規則將消息分發到對應的隊列供消費者消費了。

    消費服務需要declare三個隊列

    • [queue_name]?隊列名稱,格式符合?[服務名稱]@訂閱服務標識
    • [queue_name]@retry?重試隊列
    • [queue_name]@failed?失敗隊列

    訂閱服務標識是客戶端自己對訂閱的分類標識符,比如用戶中心服務(服務名稱ucenter),包含兩個訂閱:user和enterprise,這里兩個訂閱的隊列名稱就為?ucenter@user和ucenter@enterprise,其對應的重試隊列為?ucenter@user@retry和ucenter@enterprise@retry。

    Declare隊列時,參數規定規則如下

    參數值說明
    queue-隊列名稱
    passivefalse隊列不存在則創建,存在則直接成功
    durabletrue隊列持久化
    exclusivefalse排他,指定該選項為true則隊列只對當前連接有效,連接斷開后自動刪除
    no-waitfalse該方法需要應答確認
    auto-deletefalse當不再使用時,是否自動刪除

    對于@retry重試隊列,需要指定額外參數

    'x-dead-letter-exchange' => 'master' 'x-message-ttl' => 30 * 1000 // 重試時間設置為30s

    這里的兩個header字段的含義是,在隊列中延遲30s后,將該消息重新投遞到x-dead-letter-exchange對應的Exchange中

    Java代碼

    // 聲明監聽隊列 channel.queueDeclare(queueName, // 隊列名稱true, // durable false, // exclusive false, // autoDelete null // arguments ); channel.queueDeclare(queueName + "@failed", true, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", exchangeName()); arguments.put("x-message-ttl", 30 * 1000); channel.queueDeclare(queueName + "@retry", true, false, false, arguments);

    PHP代碼

    $this->channel->queue_declare($queueName, false, true, false, false, false); $this->channel->queue_declare($failedQueueName, false, true, false, false, false); $this->channel->queue_declare( $retryQueueName, // 隊列名稱 false, // passive true, // durable false, // exclusive false, // auto_delete false, // nowait new AMQPTable([ 'x-dead-letter-exchange' => 'master', 'x-message-ttl' => 30 * 1000, ]) );

    在RabbitMQ的管理界面中,Queues部分可以看到我們創建的三個隊列

    查看隊列的詳細信息,我們可以看到?queueName@retry?隊列與其它兩個隊列的不同

    Bind Exchange & Queue

    創建完隊列之后,需要將隊列與Exchange綁定(bind),不同隊列需要綁定到之前創建的對應的Exchange上面

    QueueExchange
    [queue_name]master
    [queue_name]@retrymaster.retry
    [queue_name]@failedmaster.failed

    綁定時,需要提供訂閱的路由KEY,該路由KEY與消息發布時的路由KEY對應,區別是這里可以使用通配符同時訂閱多種類型的消息。

    參數值說明
    queue-綁定的隊列
    exchange-綁定的Exchange
    routing-key-訂閱的消息路由規則
    no-waitfalse該方法需要應答確認

    Java代碼

    // 綁定監聽隊列到Exchange channel.queueBind(queueName, "master", routingKey); channel.queueBind(queueName + "@failed", "master.failed", routingKey); channel.queueBind(queueName + "@retry", "master.retry", routingKey);

    PHP代碼

    $this->channel->queue_bind($queueName, 'master', $routingKey); $this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey); $this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);

    在RabbitMQ的管理界面中,我們可以看到該隊列與Exchange和routing-key的綁定關系

    消息消費實現

    使用?basic_consume?對消息進行消費的時候,需要注意下面參數

    參數值說明
    queue-消費的隊列名稱
    consumer-tag-消費者標識,留空即可
    no_localfalse如果設置了該字段,服務器將不會發布消息到 發布它的客戶端
    no_ackfalse需要消費確認應答
    exclusivefalse排他訪問,設置后只允許當前消費者訪問該隊列
    nowaitfalse該方法需要應答確認

    消費端在消費消息時,需要從消息中獲取消息被消費的次數,以此判斷該消息處理失敗時重試還是發送到失敗隊列。

    Java代碼

    protected Long getRetryCount(AMQP.BasicProperties properties) { Long retryCount = 0L; try { Map<String, Object> headers = properties.getHeaders(); if (headers != null) { if (headers.containsKey("x-death")) { List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death"); if (deaths.size() > 0) { Map<String, Object> death = deaths.get(0); retryCount = (Long) death.get("count"); } } } } catch (Exception e) {} return retryCount; }

    PHP代碼

    protected function getRetryCount(AMQPMessage $msg): int { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; }

    消息消費完成后,需要發送消費確認消息給服務端,使用basic_ack方法

    ack(delivery-tag=消息的delivery-tag標識)

    Java代碼

    // 消息消費處理 Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ... // 注意,由于使用了basicConsume的autoAck特性,因此這里就不需要手動執行 // channel.basicAck(envelope.getDeliveryTag(), false); } }; // 執行消息消費處理 channel.basicConsume( queueName, true, // autoAck consumer );

    PHP代碼

    $this->channel->basic_consume($queueName,'', // customer_tagfalse, // no_local false, // no_ack false, // exclusive false, // nowait function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) { ... $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } );

    如果消息處理中出現異常,應該將該消息重新投遞到重試Exchange,等待下次重試

    basic_publish(msg, 'master.retry', routing-key) ack(delivery-tag) // 不要忘記了應答消費成功消息

    如果判斷重試次數大于3次,仍然處理失敗,則應該講消息投遞到失敗Exchange,等待人工處理

    basic_publish(msg, 'master.failed', routing-key) ack(delivery-tag) // 不要忘記了應答消費成功消息

    一定不要忘記ack消息,因為重試、失敗都是通過將消息重新投遞到重試、失敗Exchange來實現的,如果忘記ack,則該消息在超時或者連接斷開后,會重新被重新投遞給消費者,如果消費者依舊無法處理,則會造成死循環。

    Java代碼

    try {String message = new String(body, "UTF-8"); // 消息處理函數 handler.handle(message, envelope.getRoutingKey()); } catch (Exception e) { long retryCount = getRetryCount(properties); if (retryCount > 3) { // 重試次數大于3次,則自動加入到失敗隊列 channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body); } else { // 重試次數小于3,則加入到重試隊列,30s后再重試 channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body); } }

    失敗任務重試

    如果任務重試三次仍未成功,則會被投遞到失敗隊列,這時候需要人工處理程序異常,處理完畢后,需要將消息重新投遞到隊列進行處理,這里唯一需要做的就是從失敗隊列訂閱消息,然后獲取到消息后,清空其application_headers頭信息,然后重新投遞到master這個Exchange即可。

    Java代碼

    channel.basicPublish('master', envelope.getRoutingKey(),MessageProperties.PERSISTENT_BASIC,body );

    PHP代碼

    $msg->set('application_headers', new AMQPTable([])); $this->channel->basic_publish($msg,'master', $msg->get('routing_key') );

    怎么使用

    隊列和Exchange以及發布訂閱的關系我們就說完了,那么使用起來是什么效果呢?這里我們以Java代碼為例

    // 發布消息 Publisher publisher = new Publisher(factory.newConnection(), 'master'); publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create"); // 訂閱消息 new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME) .init("user-monitor", "user.*") .subscribe((message, routingKey) -> { // TODO 業務邏輯 System.out.printf(" <%s> message consumed: %s\n", routingKey, message); } );

    總結

    使用RabbitMQ時,實現延時重試和失敗隊列的方式并不僅僅局限于本文中描述的方法,如果讀者有更好的實現方案,歡迎拍磚,在這里我也只是拋磚引玉了。本文中講述的方法還有很多優化空間,讀者也可以試著去改進其實現方案,比如本文中使用了三個Exchagne,是否只使用一個Exchange也能實現本文中所講述的功能。

    本文將會持續修正和更新,最新內容請參考我的?GITHUB?上的?程序猿成長計劃?項目,歡迎 Star,更多精彩內容請?follow me。

    轉載于:https://www.cnblogs.com/rainbowaab/p/9044098.html

    總結

    以上是生活随笔為你收集整理的RabbitMQ发布订阅实战-实现延时重试队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。