日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式消息中间件-Rocketmq

發布時間:2024/4/13 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式消息中间件-Rocketmq 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡述

現在生產中用的最多的消息隊列有Activemq,rabbitmq,kafka,rocketmq等。

JMS規范

rocketmq雖然不完全基于jms規范,但是他參考了jms規范和 CORBA Notification 規范等, 可以說是青出于藍而勝于藍,

什么是jms呢

jms其實就是類似于jdbc的一套接口規范,但不同的是他是面向的消息服務,提供一套標準API接口, 大部分廠商都會參考jms規范,不過我們后面要講到的rocketmq卻沒有嚴格遵守jms規范,后面我們會講到。一些常見的jms廠商有:IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ, 還有APACHE開源的ActiveMQ。這里面Activemq這個也是我接觸到的第一個mq,現在市場份額也是很大的, 京東商城采用的就是這個。

基本概念

發送者( Sender)也就是消息的生產者,俗的將就是創建并發送消息的JMS客戶端。接收者( Receiver)也就是消息消費者,接收訂制消息的并按照相應的業務邏輯進行處理,最終將結果反饋給mq的服務端。點對點( Point-to-Point(P2P) )點對點就是一對一的關系,一個消息發出只有一個接受者所處理。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。發布訂閱( Publish/Subscribe(Pub/Sub) )1、客戶端將消息發送到主題。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。消息隊列(Queue) 一個容納那些被發送的等待閱讀的消息的區域。與隊列名字所暗示的意思不同, 消息的接受順序并不一定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。主題(Topic)一種支持發送消息給多個訂閱者的機制。發布者(Publisher) 同生產者訂閱者(Subscriber) 針對同一主題的多個消費者

對象模型

(1) ConnectionFactory創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種(很顯然是基于點對點和和發布訂閱的兩種方式分別創建連接工廠的)。 可以通過JNDI來查找ConnectionFactory對象。(2) DestinationDestination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對于消息生產者來說, 它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說, 它的Destination也是某個隊列或主題(即消息來源)。所以, Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。(3) ConnectionConnection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。 Connection可以產生一個或多個Session。跟ConnectionFactory一樣, Connection也有兩種類型:QueueConnection和TopicConnection。(4) SessionSession是我們操作消息的接口。可以通過session創建生產者、消費者、消息等。 Session提供了事務的功能。當我們需要使用session發送/接收多個消息時, 可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。(5) 消息的生產者消息生產者由Session創建,并用于將消息發送到Destination。 同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。 可以調用消息生產者的方法(send或publish方法)發送消息。(6) 消息消費者 消息消費者由Session創建,用于接收被發送到Destination的消息。 兩種類型:QueueReceiver和TopicSubscriber。 可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。 當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。(7) MessageListener消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。 我們后面消息消費還會看到。

消息消費

在JMS中,消息的產生和消息是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。○ 同步 訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞○ 異步 訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。

activemq的部分代碼來簡單說明一下上面說道的一些JMS規范

public void init(){try {//創建一個鏈接工廠(用戶名,密碼,broker的url地址)connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//從工廠中創建一個鏈接connection = connectionFactory.createConnection();//開啟鏈接connection.start();//創建一個會話session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}公共部分:也就是說不管你是消息的生產者還是消息的消費者都需要這些步驟 1.首先我們需要創建一個連接工廠,當然這里我們需要輸入用戶性和密碼還有就是broker的url 2.然后我們根據連接工廠創建了一個連接,此刻這個工廠并沒有和broker建立連接 3.調用start方法就和broker建立了連接,這里我大概解釋一下broker 4.創建一個session,上面我們提到過所有的消息操作都是與session進行的public void sendMsg(String queueName){try {//創建一個消息隊列(此處也就是在創建Destination)Queue queue = session.createQueue(queueName);//消息生產者MessageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//創建一條消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:生產消息,count:"+num);//發送消息messageProducer.send(msg);//提交事務session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}

生產:配置完上面的公共部分我們就迫不及待的把消息生產出來吧,我這邊說的是點對點的方式

1.通過session創建一個Destination,我這邊直接就用了queue了 2.接下來我們需要創建一個消息的生產者 3.我這邊就循環每1s發送一條消息 4.這邊看到我們的消息也是用session來創建的,這里面我們用的是文本的消息類型 5.發送消息 6.提交這次發送,至此我們的消息就發送到了broker上了,用過activemq的同學都知道, activemq提供了一個很好用的界面可以查到你的消息的狀態,包括是否消費等消費:消費我們上面也提到了兩種方式,同步和異步,我這邊準備了兩份代碼分別說明了一下public void doMessage(String queueName){try {//創建DestinationQueue queue = session.createQueue(queueName);MessageConsumer consumer = null;while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}

同步:可以看到消息會一直阻塞到有消息才會繼續

?

?

總結

以上是生活随笔為你收集整理的分布式消息中间件-Rocketmq的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。