日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ topic路由

發(fā)布時(shí)間:2024/1/17 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ topic路由 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??

?

原創(chuàng)文章,轉(zhuǎn)載請注明出處:http://jameswxx.iteye.com/blog/2096446 這里以消費(fèi)者為例說明。一組消費(fèi)者要消費(fèi)某個(gè)topic,得先知道該topic分布在哪些broker上,某個(gè)broker上的topic分布可能會(huì)變化,一旦變化,生產(chǎn)者和消費(fèi)者應(yīng)該都能被通知到。通知模式有推和拉兩種,客戶端都是采取拉的模式,所以broker如有變化,通知都是有延遲的。 一 什么時(shí)候啟動(dòng)topic路由獲取任務(wù) 兩個(gè)地方: 1 首先是DefaultMQPushConsumerImpl啟動(dòng)時(shí),見DefaultMQPushConsumerImpl的start方法里的 this .updateTopicSubscribeInfoWhenSubscriptionChanged(); 2 另外DefaultMQPushConsumerImpl的start方法也啟動(dòng)了MQClientInstance,MQClientInstance的start方法里調(diào)用了 startScheduledTask()方法,該方法啟動(dòng)了獲取路由的定時(shí)任務(wù)。
? ? ? ?? // 定時(shí)從Name Server獲取Topic路由信息 ???????? this . scheduledExecutorService? .scheduleAtFixedRate( new? Runnable() { ???????????? @Override ???????????? public? void? run() { ???????????????? try? { ??????????????????? MQClientInstance. this? .updateTopicRouteInfoFromNameServer(); ??????????????? } ???????????????? catch? (Exception e) { ???????????????????? log .error(? "ScheduledTask updateTopicRouteInfoFromNameServer exception" , e); ??????????????? } ??????????? } ??????? }, 10,? this . clientConfig? .getPollNameServerInteval(), TimeUnit. MILLISECONDS? );
? 二 每隔多久獲取一次 很簡單,看定時(shí)任務(wù)每隔多久執(zhí)行一次就知道了,這里的間隔參數(shù)是 this . clientConfig? .getPollNameServerInteval()。 ClientConfig的pollNameServerInteval?定義如下: private int pollNameServerInteval = 1000 * 30; DefaultMQPushConsumer繼承了ClientConfig, pollNameServerInteval?默認(rèn)是30秒,顯然,這個(gè)時(shí)間是可以自己定義的,通過 DefaultMQPushConsumer的 setPollNameServerInteval()方法。 ? 三 獲取路由過程 MQClientInstance的 updateTopicRouteInfoFromNameServer()方法,該方法最終會(huì)調(diào)用下面這個(gè)方法,需要注意,對于消費(fèi)者而言, isDefault參數(shù)永遠(yuǎn)是false。
?? public? boolean? updateTopicRouteInfoFromNameServer( final? String topic,? boolean? isDefault, DefaultMQProducer defaultMQProducer) { ???????? try? { ???????????? if? ( this . lockNamesrv? .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS? )) { ???????????????? try? { ??????????????????? TopicRouteData topicRouteData; ???????????????????? if? (isDefault && defaultMQProducer !=? null ) { ? ? ? ? ? ? ? ? ? ? ? ?//此處省略不必要的信息,對于消費(fèi)者,分支不會(huì)走到這里來,因?yàn)?/span> isDefault為false,且生產(chǎn)者肯定為空 ??????????????????? } ???????????????????? else? { ??????????????????????? topicRouteData = ???????????????????????????????? this . mQClientAPIImpl? .getTopicRouteInfoFromNameServer(topic, 1000 * 3); ??????????????????? } ? ? ? ? ? ? ? ? ? ??//此處省略無關(guān)語句 ??????????????? } ???????????????? catch? (Exception e) { ???????????????????? if? (!topic.startsWith(MixAll. RETRY_GROUP_TOPIC_PREFIX? ) ??????????????????????????? && !topic.equals(MixAll. DEFAULT_TOPIC? )) { ???????????????????????? log .warn( "updateTopicRouteInfoFromNameServer Exception"? , e); ??????????????????? } ??????????????? } ???????????????? finally? { ???????????????????? this . lockNamesrv? .unlock(); ??????????????? } ??????????? } ???????????? else? { ???????????????? log .warn( "updateTopicRouteInfoFromNameServer tryLock timeout {}ms" ,? LockTimeoutMillis ); ??????????? } ??????? } ???????? catch? (InterruptedException e) { ???????????? log .warn(? "updateTopicRouteInfoFromNameServer Exception" , e); ??????? } ? ???????? return? false? ; ? }
其實(shí)最終都是通過 this ?. mQClientAPIImpl? .getTopicRouteInfoFromNameServer(topic, 1000 * 3);得到的。 ? ? ? 四 客戶端與nameserver的連接關(guān)系 broker與所有nameserver都是長連接,如有變化,則向所有nameserver都發(fā)送消息。但是生產(chǎn)者和消費(fèi)者只是跟某一臺nameserver保持聯(lián)系。 設(shè)定一個(gè)場景, 如果某個(gè)broker的topic配置發(fā)生了變化,它向所有nameserver發(fā)布通知,但是此時(shí)如果某一臺nameserver推送失敗(超時(shí)或者掛掉了),則nameserver集群之間的信息是不完整的,因?yàn)閽斓舻哪桥_nameserver沒有得到最新變化。 由此衍生三個(gè)問題: 1 如果該nameserver不是掛掉,只是那一瞬間沒有響應(yīng),那么待可正常服務(wù)時(shí),剛才那個(gè)borker發(fā)生的變化應(yīng)該能生效,不應(yīng)該被丟棄,否則nameserver之間的數(shù)據(jù)是不同步的。 ??解決方案:broker是定時(shí)向所有nameserver發(fā)送自己的注冊信息的,如果當(dāng)時(shí)某臺nameserver掛掉重啟或者超時(shí),沒關(guān)系,下次仍然會(huì)接受到上次沒接收到的broker信息 2 如果真的掛掉了,但是很快又恢復(fù)了,因?yàn)閎orker和nameserver保持的是長連接,顯然掛掉重新啟動(dòng)后,broker與nameserver的長連接無效了,應(yīng)該能自動(dòng)重連 ?? getAndCreateChannel方法分析 3 只要某個(gè)nameserver不可用,消費(fèi)者應(yīng)該能failover,每次應(yīng)該都檢查長連接是否還有效,若無效則 自動(dòng)連接其他nameserver。 ?? getAndCreateNameserverChannel()方法分析 ? 帶著這個(gè)疑問,看看 this ?. mQClientAPIImpl? .getTopicRouteInfoFromNameServer(topic, 1000 * 3)方法。 這個(gè)方法向nameserver發(fā)起調(diào)用,獲取路由結(jié)果
RemotingCommand request =? RemotingCommand.createRequestCommand(RequestCode.? GET_ALL_TOPIC_LIST_FROM_NAMESERVER? ,? null ); RemotingCommand response =? this ?. remotingClient? .invokeSync( ?null , request, timeoutMillis);
重點(diǎn)在于 remotingClient? .invokeSync方法,如下
@Override ???? public ?RemotingCommand invokeSync(String addr,? final ?RemotingCommand request,? long? timeoutMillis) ???????????? throws ?InterruptedException, RemotingConnectException, RemotingSendRequestException, ??????????? RemotingTimeoutException { ? ? ? ??//這里獲取連接,該方法里面會(huì)做連接的檢查和恢復(fù) ???????? final ?Channel channel =? this ?.getAndCreateChannel(addr); ? ? ? ? ??//最后如果還是不是有效連接,則關(guān)閉連接,拋出異常 ???????? if ?(channel !=? null ?&& channel.isActive()) { ???????????? try ?{ ???????????????? if ?( this? . rpcHook? !=? null ) { ???????????????????? this ?. rpcHook? .doBeforeRequest(addr, request); ??????????????? } ??????????????? RemotingCommand response =? this ?.invokeSyncImpl(channel, request, timeoutMillis); ???????????????? if ?( this? . rpcHook? !=? null ) { ???????????????????? this ?. rpcHook? .doAfterResponse(request, response); ??????????????? } ???????????????? return ?response; ??????????? } ???????????? catch ?(RemotingSendRequestException e) { ???????????????? log ?.warn( "invokeSync: send request exception, so close the channel[{}]" , addr); ???????????????? this ?.closeChannel(addr, channel); ???????????????? throw ?e; ??????????? } ???????????? catch ?(RemotingTimeoutException e) { ???????????????? log ?.warn( "invokeSync: wait response timeout exception, the channel[{}]" , addr); ???????????????? // 超時(shí)異常如果關(guān)閉連接可能會(huì)產(chǎn)生連鎖反應(yīng) ???????????????? // this.closeChannel(?addr, channel); ???????????????? throw ?e; ??????????? } ??????? } ???????? else ?{ ???????????? this ?.closeChannel(addr, channel); ???????????? throw ? new? RemotingConnectException(addr); ??????? } ??? }
這個(gè)方法大體分為兩步,第一步獲取連接,第二步通過連接發(fā)送請求,獲取連接當(dāng)然是 getAndCreateChannel方法了, getAndCreateChannel方法非常重要,它包含了客戶端對nameserver的failover,也包含了自動(dòng)重連功能, 對于客戶端,傳入的addr參數(shù)都是null,所以一直會(huì)走到 getAndCreateNameserverChannel()方法。
?? ?private ?Channel? getAndCreateChannel ( ?final ?String addr)? throws ?InterruptedException { ? ? ? ??//無論是producer還是consumer,傳進(jìn)來的 addr參數(shù)都是null ???????? if ?( null? == addr) ???????????? return ?getAndCreateNameserverChannel(); ? ? ? ? ? //因?yàn)榭蛻舳藗魅氲?/span>addr是null,所以客戶端不會(huì)走到這里來,只有broker才會(huì)走到這里來,因?yàn)閎roker傳入的addr不為null ??????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ???????? if ?(cw !=? null? && cw.isOK()) { ???????????? return ?cw.getChannel(); ??????? } ? ? ? ? ? //注意,如果和某個(gè)addr的連接不OK了,則再向該nameserver發(fā)起重連 ???????? return ? this? .createChannel(addr); ??? }
? createChannel方法很簡單,無非就是創(chuàng)建連接嘛,就不細(xì)看了,分析下 getAndCreateNameserverChannel(),以下是該方法大致過程: 因?yàn)榭蛻舳硕际桥c某一臺nameserver長連接,因此長連接一旦選定,后面不會(huì)變化,除非nameserver掛掉,所以已建立的長連接要保存起來。下面這段邏輯就是如此。
? ? ? ?String addr =? this ?. namesrvAddrChoosed? .get(); ???????? if ?(addr !=? null ) { ??????????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ? ? ? ? ? ? ?//注意這里,雖然長連接已經(jīng)建立了,但是每次調(diào)用時(shí),仍然要通過“ cw !=?null?&& cw.isOK()”檢查連接是否OK。 ? ? ? ? ? ??? if ?(cw !=? null? && cw.isOK()) { ???????????????? return ?cw.getChannel(); ??????????? } ??????? }
如果連接沒有建立或連接已經(jīng)斷開,則繼續(xù)往下,真正創(chuàng)建連接時(shí)需要加鎖的 ? if ( this . lockNamesrvChannel .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS )) 下面的代碼都是在這個(gè)if塊里面 這里又執(zhí)行了一邊上面的獲取連接并檢測的代碼,可以連接,因?yàn)橛袝r(shí)候連接只是偶爾不OK的
? ? ?addr =? this .? namesrvAddrChoosed ?.get(); ???????????????? if ?(addr !=? null ) { ??????????????????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ???????????????????? if ?(cw !=? null? && cw.isOK()) { ???????????????????????? return ?cw.getChannel(); ??????????????????? } ??????????????? }
接著往下, 這段代碼非常重要 namesrvIndex指示了當(dāng)前跟哪個(gè)nameserver發(fā)生連接,初始值是個(gè)隨機(jī)數(shù),跟nameserver數(shù)量取模,走到這一步,要么是首次發(fā)起調(diào)用,之前連接還未創(chuàng)建現(xiàn)在要?jiǎng)?chuàng)建了,或者是已創(chuàng)建的連接無效了要連接下一個(gè)nameserver,就是“cw.isOK()”為false。 ?
? ? ? ? ?if ?(addrList !=? null ?&& !addrList.isEmpty()) { ???????????????????? for ?( int? i = 0; i < addrList.size(); i++) { ???????????????????????? int ?index =? this ?. namesrvIndex? .incrementAndGet(); ??????????????????????? index = Math.?abs(index); ??????????????????????? index = index % addrList.size(); ??????????????????????? String newAddr = addrList.get(index); ? ???????????????????????? this ?.namesrvAddrChoosed.set(newAddr); ??????????????????????? Channel channelNew =? this ?.createChannel(newAddr); ???????????????????????? if ?(channelNew !=? null ) ???????????????????????????? return ?channelNew; ??????????????????? } ??????????????? }

?

轉(zhuǎn)載于:https://my.oschina.net/boltwu/blog/473025

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的RocketMQ topic路由的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。