RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 概述
- 集群信息
- 項(xiàng)目結(jié)構(gòu)
- 生產(chǎn)者
- 自定義類
- 消費(fèi)者
- 測(cè)試結(jié)果
概述
RocketMQ-初體驗(yàn)RocketMQ(10)-過(guò)濾消息_SQL92表達(dá)式篩選消息 通過(guò)SQL92的方式,消費(fèi)者可以過(guò)濾到自己想要的消息,其實(shí)RocketMQ還提供了一個(gè)更為搶到的功能,支持自定義Java類…
直接來(lái)看下如何使用的吧
集群信息
RocketMQ : V4.3.2
集群模式: 互為主備
節(jié)點(diǎn)信息: 192.168.18.130 192.168.18.131 雙機(jī)互為主備
broker-m.conf 和 broker-s.conf 均已配置了 filterServerNums=1 ,已重啟。
項(xiàng)目結(jié)構(gòu)
生產(chǎn)者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:30* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterProducer {/**** TAG-FILTER-1000 ---> 布隆過(guò)濾器* 過(guò)濾掉的那些消息。直接就跳過(guò)了么。下次就不會(huì)繼續(xù)過(guò)濾這些了。是么。* @param args* @throws Exception*/public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 3; i++) {Message msg = new Message("TopicFilter","TAG-FILTER",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties. 生產(chǎn)者設(shè)置屬性,消費(fèi)者端通過(guò)Tag+該屬性定制消息msg.putUserProperty("a", String.valueOf(i));if (i % 2 == 0) {msg.putUserProperty("b", "artisan");} else {msg.putUserProperty("b", "smart artisan");}producer.send(msg);}producer.shutdown();}}自定義類
/*** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.artisan.rocketmq.filter;import org.apache.rocketmq.common.filter.FilterContext; import org.apache.rocketmq.common.filter.MessageFilter; import org.apache.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {@Overridepublic boolean match(MessageExt msg, FilterContext context) {String property = msg.getProperty("SequenceId");if (property != null) {int id = Integer.parseInt(property);if (((id % 10) == 0) && (id > 100)) {return true;}}return false;} }消費(fèi)者
package com.artisan.rocketmq.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-11 23:45* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");/*** 注冊(cè)中心*/consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 訂閱主題* 一種資源去換取另外一種資源*/consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'artisan'"));/*** 注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)主題消息*/consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", content:"+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Filter Consumer Started.%n");} }測(cè)試結(jié)果
目前 沒(méi)測(cè)試成功,先記錄下。
總結(jié)
以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(11)-过滤消息_自定义Java类筛选消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RocketMQ-初体验RocketMQ
- 下一篇: Apache Kafka-初体验Kafk