日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ源码解析:Filtersrv

發布時間:2024/4/13 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ源码解析:Filtersrv 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

???關注微信公眾號:【芋艿的后端小屋】有福利:

  • RocketMQ / MyCAT / Sharding-JDBC 所有源碼分析文章列表
  • RocketMQ / MyCAT / Sharding-JDBC 中文注釋源碼 GitHub 地址
  • 您對于源碼的疑問每條留言將得到認真回復。甚至不知道如何讀源碼也可以請教噢。
  • 新的源碼解析文章實時收到通知。每周更新一篇左右。

    • 1. 概述
    • 2. Filtersrv 注冊到 Broker
    • 3. 過濾類
      • 3.1 Consumer 訂閱時設置 過濾類代碼
      • 3.2 Consumer 上傳 過濾類代碼
      • 3.3 Filter 編譯 過濾類代碼
    • 4. 過濾消息
      • 4.1 Consumer 從 Filtersrv 拉取消息
      • 4.2 Filtersrv 從 Broker 拉取消息
    • 5. Filtersrv 高可用

    1. 概述

    Filtersrv ,負責自定義規則過濾 Consumer 從 Broker 拉取的消息。

    Filtersrv.png

    為什么 Broker 不提供過濾消息的功能呢?我們來看看官方的說法:

    • Broker 端消息過濾
      在 Broker 中,按照 Consumer 的要求做過濾,優點是減少了對于 Consumer 無用消息的網絡傳輸。 缺點是增加了 Broker 的負擔,實現相對復雜。
      (1). 淘寶 Notify 支持多種過濾方式,包含直接按照消息類型過濾,靈活的語法表達式過濾,幾乎可以滿足最苛刻的過濾需求。
      (2). 淘寶 RocketMQ 支持按照簡單的 Message Tag 過濾,也支持按照 Message Header、body 進行過濾。
      (3). CORBA Notification 規范中也支持靈活的語法表達式過濾。
    • Consumer 端消息過濾
      這種過濾方式可由應用完全自定義實現,但是缺點是很多無用的消息要傳輸到 Consumer 端。

    就是在這種考慮下,Filtersrv 出現了。減少了 Broker 的負擔,又減少了 Consumer 接收無用的消息。當然缺點也是有的,多了一層 Filtersrv 網絡開銷。

    2. Filtersrv 注冊到 Broker

    • ? 一個 Filtersrv 對應一個 Broker。
    • ? 一個 Broker 可以對應多個 Filtersrv。Filtersrv 的高可用通過啟動多個 Filtersrv 實現。
    • ? Filtersrv 注冊失敗時,主動退出關閉。

    核心代碼如下:

    1: // ??????【FiltersrvController.java】2: public boolean initialize() {3: // ....(省略代碼)4: 5: // 固定間隔注冊到Broker6: this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {7: 8: @Override9: public void run() {10: FiltersrvController.this.registerFilterServerToBroker();11: }12: }, 15, 10, TimeUnit.SECONDS); // TODO edit by 芋艿:initialDelay時間太短,可能導致初始化失敗。從3=》1513: 14: // ....(省略代碼)15: }16: 17: /**18: * 注冊Filtersrv 到 Broker19: * !!!如果注冊失敗,關閉Filtersrv20: */21: public void registerFilterServerToBroker() {22: try {23: RegisterFilterServerResponseHeader responseHeader =24: this.filterServerOuterAPI.registerFilterServerToBroker(25: this.filtersrvConfig.getConnectWhichBroker(), this.localAddr());26: this.defaultMQPullConsumer.getDefaultMQPullConsumerImpl().getPullAPIWrapper()27: .setDefaultBrokerId(responseHeader.getBrokerId());28: 29: if (null == this.brokerName) {30: this.brokerName = responseHeader.getBrokerName();31: }32: 33: log.info("register filter server<{}> to broker<{}> OK, Return: {} {}",34: this.localAddr(),35: this.filtersrvConfig.getConnectWhichBroker(),36: responseHeader.getBrokerName(),37: responseHeader.getBrokerId());38: } catch (Exception e) {39: log.warn("register filter server Exception", e);40: 41: log.warn("access broker failed, kill oneself");42: System.exit(-1); // 異常退出43: }44: }復制代碼

    3. 過濾類

    Filtersrv過濾類

    3.1 Consumer 訂閱時設置 過濾類代碼

    • ? Consumer 針對每個 Topic 可以訂閱不同的 過濾類代碼。
    1: // ??????【DefaultMQPushConsumer.java】2: @Override3: public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {4: this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);5: }復制代碼

    3.2 Consumer 上傳 過濾類代碼

    • ? Consumer 心跳注冊到 Broker 的同時,上傳 過濾類代碼 到 Broker 對應的所有 Filtersrv。
    1: // ??????【MQClientInstance.java】2: /**3: * 發送心跳到Broker,上傳過濾類源碼到Filtersrv4: */5: public void sendHeartbeatToAllBrokerWithLock() {6: if (this.lockHeartbeat.tryLock()) {7: try {8: this.sendHeartbeatToAllBroker();9: this.uploadFilterClassSource();10: } catch (final Exception e) {11: log.error("sendHeartbeatToAllBroker exception", e);12: } finally {13: this.lockHeartbeat.unlock();14: }15: } else {16: log.warn("lock heartBeat, but failed.");17: }18: }19: 20: /**21: * 上傳過濾類到Filtersrv22: */23: private void uploadFilterClassSource() {24: Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();25: while (it.hasNext()) {26: Entry<String, MQConsumerInner> next = it.next();27: MQConsumerInner consumer = next.getValue();28: if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {29: Set<SubscriptionData> subscriptions = consumer.subscriptions();30: for (SubscriptionData sub : subscriptions) {31: if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {32: final String consumerGroup = consumer.groupName();33: final String className = sub.getSubString();34: final String topic = sub.getTopic();35: final String filterClassSource = sub.getFilterClassSource();36: try {37: this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);38: } catch (Exception e) {39: log.error("uploadFilterClassToAllFilterServer Exception", e);40: }41: }42: }43: }44: }45: }復制代碼

    3.3 Filter 編譯 過濾類代碼

    • ? Filtersrv 處理 Consumer 上傳的 過濾類代碼,并進行編譯使用。

    核心代碼如下:

    1: // ??????【FilterClassManager.java】2: /**3: * 注冊過濾類4: *5: * @param consumerGroup 消費分組6: * @param topic Topic7: * @param className 過濾類名8: * @param classCRC 過濾類源碼CRC9: * @param filterSourceBinary 過濾類源碼10: * @return 是否注冊成功11: */12: public boolean registerFilterClass(final String consumerGroup, final String topic,13: final String className, final int classCRC, final byte[] filterSourceBinary) {14: final String key = buildKey(consumerGroup, topic);15: // 判斷是否要注冊新的過濾類16: boolean registerNew = false;17: FilterClassInfo filterClassInfoPrev = this.filterClassTable.get(key);18: if (null == filterClassInfoPrev) {19: registerNew = true;20: } else {21: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {22: if (filterClassInfoPrev.getClassCRC() != classCRC && classCRC != 0) { // 類有變化23: registerNew = true;24: }25: }26: }27: // 注冊新的過濾類28: if (registerNew) {29: synchronized (this.compileLock) {30: filterClassInfoPrev = this.filterClassTable.get(key);31: if (null != filterClassInfoPrev && filterClassInfoPrev.getClassCRC() == classCRC) {32: return true;33: }34: try {35: FilterClassInfo filterClassInfoNew = new FilterClassInfo();36: filterClassInfoNew.setClassName(className);37: filterClassInfoNew.setClassCRC(0);38: filterClassInfoNew.setMessageFilter(null);39: 40: if (this.filtersrvController.getFiltersrvConfig().isClientUploadFilterClassEnable()) {41: String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET);42: // 編譯新的過濾類43: Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource);44: // 創建新的過濾類對象45: Object newInstance = newClass.newInstance();46: filterClassInfoNew.setMessageFilter((MessageFilter) newInstance);47: filterClassInfoNew.setClassCRC(classCRC);48: }49: 50: this.filterClassTable.put(key, filterClassInfoNew);51: } catch (Throwable e) {52: String info = String.format("FilterServer, registerFilterClass Exception, consumerGroup: %s topic: %s className: %s",53: consumerGroup, topic, className);54: log.error(info, e);55: return false;56: }57: }58: }59: 60: return true;61: }復制代碼

    4. 過濾消息

    Filtersrv.png

    4.1 Consumer 從 Filtersrv 拉取消息

    • ? Consumer 拉取 使用過濾類方式訂閱 的消費消息時,從 Broker 對應的 Filtersrv 列表隨機選擇一個拉取消息。如果選擇不到 Filtersrv,則無法拉取消息。因此,Filtersrv 一定要做高可用。
    1: // ??????【PullAPIWrapper.java】2: /**3: * 拉取消息核心方法4: *5: * @param mq 消息嘟列6: * @param subExpression 訂閱表達式7: * @param subVersion 訂閱版本號8: * @param offset 拉取隊列開始位置9: * @param maxNums 批量拉 取消息數量10: * @param sysFlag 拉取系統標識11: * @param commitOffset 提交消費進度12: * @param brokerSuspendMaxTimeMillis broker掛起請求最大時間13: * @param timeoutMillis 請求broker超時時間14: * @param communicationMode 通訊模式15: * @param pullCallback 拉取回調16: * @return 拉取消息結果。只有通訊模式為同步時,才返回結果,否則返回null。17: * @throws MQClientException 當尋找不到 broker 時,或發生其他client異常18: * @throws RemotingException 當遠程調用發生異常時19: * @throws MQBrokerException 當 broker 發生異常時。只有通訊模式為同步時才會發生該異常。20: * @throws InterruptedException 當發生中斷異常時21: */22: protected PullResult pullKernelImpl(23: final MessageQueue mq,24: final String subExpression,25: final long subVersion,26: final long offset,27: final int maxNums,28: final int sysFlag,29: final long commitOffset,30: final long brokerSuspendMaxTimeMillis,31: final long timeoutMillis,32: final CommunicationMode communicationMode,33: final PullCallback pullCallback34: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {35: // // ....(省略代碼)36: // 請求拉取消息37: if (findBrokerResult != null) {38: // ....(省略代碼)39: // 若訂閱topic使用過濾類,使用filtersrv獲取消息40: String brokerAddr = findBrokerResult.getBrokerAddr();41: if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {42: brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);43: }44: 45: PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(46: brokerAddr,47: requestHeader,48: timeoutMillis,49: communicationMode,50: pullCallback);51: 52: return pullResult;53: }54: 55: // Broker信息不存在,則拋出異常56: throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);57: }58: 59: /**60: * 計算filtersrv地址。如果有多個filtersrv,隨機選擇一個。61: *62: * @param topic Topic63: * @param brokerAddr broker地址64: * @return filtersrv地址65: * @throws MQClientException 當filtersrv不存在時66: */67: private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)68: throws MQClientException {69: ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();70: if (topicRouteTable != null) {71: TopicRouteData topicRouteData = topicRouteTable.get(topic);72: List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);73: if (list != null && !list.isEmpty()) {74: return list.get(randomNum() % list.size());75: }76: }77: throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "78: + topic, null);79: }復制代碼

    4.2 Filtersrv 從 Broker 拉取消息

    • ? Filtersrv 拉取消息后,會建議 Consumer 向 Broker主節點 拉取消息。
    • ? Filtersrv 可以理解成一個 Consumer,向 Broker 拉取消息時,實際使用的 DefaultMQPullConsumer.java 的方法和邏輯。
    1: // ??????【DefaultRequestProcessor.java】2: /**3: * 拉取消息4: *5: * @param ctx 拉取消息context6: * @param request 拉取消息請求7: * @return 響應8: * @throws Exception 當發生異常時9: */10: private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {11: final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);12: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();13: final PullMessageRequestHeader requestHeader =14: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);15: 16: final FilterContext filterContext = new FilterContext();17: filterContext.setConsumerGroup(requestHeader.getConsumerGroup());18: 19: response.setOpaque(request.getOpaque());20: 21: DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();22: 23: // 校驗Topic過濾類是否完整24: final FilterClassInfo findFilterClass = this.filtersrvController.getFilterClassManager().findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());25: if (null == findFilterClass) {26: response.setCode(ResponseCode.SYSTEM_ERROR);27: response.setRemark("Find Filter class failed, not registered");28: return response;29: }30: if (null == findFilterClass.getMessageFilter()) {31: response.setCode(ResponseCode.SYSTEM_ERROR);32: response.setRemark("Find Filter class failed, registered but no class");33: return response;34: }35: 36: // 設置下次請求從 Broker主節點。37: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);38: 39: MessageQueue mq = new MessageQueue();40: mq.setTopic(requestHeader.getTopic());41: mq.setQueueId(requestHeader.getQueueId());42: mq.setBrokerName(this.filtersrvController.getBrokerName());43: long offset = requestHeader.getQueueOffset();44: int maxNums = requestHeader.getMaxMsgNums();45: 46: final PullCallback pullCallback = new PullCallback() {47: 48: @Override49: public void onSuccess(PullResult pullResult) {50: responseHeader.setMaxOffset(pullResult.getMaxOffset());51: responseHeader.setMinOffset(pullResult.getMinOffset());52: responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());53: response.setRemark(null);54: 55: switch (pullResult.getPullStatus()) {56: case FOUND:57: response.setCode(ResponseCode.SUCCESS);58: 59: List<MessageExt> msgListOK = new ArrayList<MessageExt>();60: try {61: for (MessageExt msg : pullResult.getMsgFoundList()) {62: // 使用過濾類過濾消息63: boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);64: if (match) {65: msgListOK.add(msg);66: }67: }68: 69: if (!msgListOK.isEmpty()) {70: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);71: return;72: } else {73: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);74: }75: } catch (Throwable e) {76: final String error =77: String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",78: requestHeader.getConsumerGroup(), requestHeader.getTopic());79: log.error(error, e);80: 81: response.setCode(ResponseCode.SYSTEM_ERROR);82: response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));83: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);84: return;85: }86: 87: break;88: case NO_MATCHED_MSG:89: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);90: break;91: case NO_NEW_MSG:92: response.setCode(ResponseCode.PULL_NOT_FOUND);93: break;94: case OFFSET_ILLEGAL:95: response.setCode(ResponseCode.PULL_OFFSET_MOVED);96: break;97: default:98: break;99: } 100: 101: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); 102: } 103: 104: @Override 105: public void onException(Throwable e) { 106: response.setCode(ResponseCode.SYSTEM_ERROR); 107: response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e)); 108: returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null); 109: return; 110: } 111: }; 112: 113: // 拉取消息 114: pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback); 115: return null; 116: }復制代碼

    5. Filtersrv 高可用

    Filtersrv過可用

    總結

    以上是生活随笔為你收集整理的RocketMQ源码解析:Filtersrv的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。