日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息

發(fā)布時(shí)間:2025/3/21 java 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 概述
  • 集群信息
  • 項(xiàng)目結(jié)構(gòu)
  • 生產(chǎn)者
  • 自定義類
  • 消費(fèi)者
  • 測(cè)試結(jié)果


概述

RocketMQ-初體驗(yàn)RocketMQ(10)-過(guò)濾消息_SQL92表達(dá)式篩選消息 通過(guò)SQL92的方式,消費(fèi)者可以過(guò)濾到自己想要的消息,其實(shí)RocketMQ還提供了一個(gè)更為搶到的功能,支持自定義Java類…

直接來(lái)看下如何使用的吧


集群信息

RocketMQ : V4.3.2

集群模式: 互為主備

節(jié)點(diǎn)信息: 192.168.18.130 192.168.18.131 雙機(jī)互為主備

broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重啟。


項(xiàng)目結(jié)構(gòu)

生產(chǎn)者

package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:30* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterProducer {/**** TAG-FILTER-1000 ---> 布隆過(guò)濾器* 過(guò)濾掉的那些消息。直接就跳過(guò)了么。下次就不會(huì)繼續(xù)過(guò)濾這些了。是么。* @param args* @throws Exception*/public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 3; i++) {Message msg = new Message("TopicFilter","TAG-FILTER",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. 生產(chǎn)者設(shè)置屬性,消費(fèi)者端通過(guò)Tag+該屬性定制消息msg.putUserProperty("a", String.valueOf(i));if (i % 2 == 0) {msg.putUserProperty("b", "artisan");} else {msg.putUserProperty("b", "smart artisan");}producer.send(msg);}producer.shutdown();}}

自定義類

/*** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.artisan.rocketmq.filter;import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext context) {String property = msg.getProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if (((id % 10) == 0) && (id > 100)) {return true;}}return false;} }

消費(fèi)者

package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");/*** 注冊(cè)中心*/consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 訂閱主題* 一種資源去換取另外一種資源*/consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));/*** 注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)主題消息*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", content:"+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Filter Consumer Started.%n");} }

測(cè)試結(jié)果

目前 沒(méi)測(cè)試成功,先記錄下。

總結(jié)

以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。