javascript
SpringAMQP--WorkQueue模型
WorkQueue
?
Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。
消息發送
這次我們循環發送,模擬大量消息堆積現象。
在publisher服務中的SpringAmqpTest類中添加一個測試方法:
/*** workQueue* 向隊列中不停發送消息,模擬消息堆積。*/ @Test public void testWorkQueue() throws InterruptedException {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);} }消息接收
要模擬多個消費者綁定同一個隊列,我們在consumer服務的SpringRabbitListener中添加2個新的方法:
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20); }@RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200); }注意到這個消費者sleep了1000秒,模擬任務耗時。
測試
啟動ConsumerApplication后,在執行publisher服務中剛剛編寫的發送測試方法testWorkQueue。
可以看到消費者1很快完成了自己的25條消息。消費者2卻在緩慢的處理自己的25條消息。
也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。這樣顯然是有問題的。
能者多勞
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息總結
Work模型的使用:
-
多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
-
通過設置prefetch來控制消費者預取的消息數量
總結
以上是生活随笔為你收集整理的SpringAMQP--WorkQueue模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SpringAMQP--入门案例的消息接
- 下一篇: SpringAMQP--发布订阅模型介绍