rocketmq源码分析 -生产者
概念
生產者producer,用于生產消息,在rocketmq中對應著MQProducer接口。
組件
Producer
消息生產者。在rocketmq中,生產者對應MQProducer接口:
public interface MQProducer extends MQAdmin {//啟動void start() throws MQClientException;//關閉void shutdown();List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;//同步發送消息SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//異步發送消息void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;//oneway形式發送消息,相較于異步發送,其實就是沒有注冊回調函數void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;//發送事務消息TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException;//批量發送消息SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;//還有各種形式的重載的發送消息的方法,省略了。。。//for rpcMessage request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final RequestCallback requestCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,InterruptedException;void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback,final long timeout) throws MQClientException, RemotingException,InterruptedException, MQBrokerException;Message request(final Message msg, final MessageQueue mq, final long timeout)throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException; }有兩個實現:
- DefaultMQProducer:默認實現。沒有實現發送事務消息的方法
- TransactionMQProducer:繼承自DefaultMQProducer,實現了發送事務消息的方法
Message
在rocketmq中,Message類就代表著生產者產出的消息。一次看看它的屬性:
-
topic:主題
-
flag:一些特殊的消息標記,int類型。標記的含義定義在MessageSysFlag中:
public final static int COMPRESSED_FLAG = 0x1; public final static int MULTI_TAGS_FLAG = 0x1 << 1; public final static int TRANSACTION_NOT_TYPE = 0; public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; public final static int BORNHOST_V6_FLAG = 0x1 << 4; public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; -
properties:額外的一些屬性,Map類型。已經使用的擴展屬性有:
- tags:消息過濾用
- keys:索引,可以有多個
- waitStoreMsgOK
- delayTimeLevel:消息延遲級別,用于定時消息或者消息重試
-
body:消息的內容
-
transactionId:事務消息用
MQClientInstance
見:MQClientInstance
MQFaultStrategy
SendMessageHook
發送消息時的hook函數,可以再消息發送前后做一些業務操作。接口定義如下:
public interface SendMessageHook {//命名String hookName();//發送消息前調用void sendMessageBefore(final SendMessageContext context);//發送消息后調用void sendMessageAfter(final SendMessageContext context); }使用
實現
本篇文章,我們會逐個分析下列過程:
- 生產者的啟動
- 發送消息
(代碼有刪減,去除了try catch和日志等不太要緊的部分)
生產者的啟動
在創建好Producer之后,使用它來發消息之前,需要先啟動它,即調用它的start()方法,代碼如下:
public void start() throws MQClientException {this.setProducerGroup(withNamespace(this.producerGroup));//DefaultMQProducerImpl的啟動this.defaultMQProducerImpl.start();if (null != traceDispatcher) {//TraceDispatcher相關見:traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} }本文主要看DefaultMQProducerImpl的啟動,代碼如下
public void start(final boolean startFactory) throws MQClientException {//一些配置校驗this.checkConfig();//如果沒有特別指定producerGroup,就會把instanceName設置為進程idif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}//創建一個MQClientInstance實例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注冊到MQClientInstanceboolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {//啟動MQClientInstance,見[MQClientInstance](https://blog.csdn.net/yuxiuzhiai/article/details/103828284)mQClientFactory.start();}//發送心跳消息給brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//啟動線程定時清理超時的請求 this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {RequestFutureTable.scanExpiredRequest();}}, 1000 * 3, 1000); }發送消息
已最普通的同步消息發送為例。主要實現在DefaultMQProducerImpl.sendDefaultImpl()方法中。進入方法
private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback,long timeout){final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//1.查找主題的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//最大可能的發送次數,同步的話就是重試次數+1,異步或者oneway就是只發一次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];//重試機制for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//2.選擇消息隊列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//3.發送消息sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} //4.異常處理機制catch (各種異常 e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);exception = e;continue;}}if (sendResult != null) {return sendResult;}//如果sendResult是null,則說明有異常,進行異常處理} }1.查找主題的路由信息
如果是第一次,則會從namesrv獲取topic元數據,獲取后會緩存下來,以后從緩存中獲取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//嘗試從緩存獲取TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//如果本地緩存沒有或者有問題,則從namesrv獲取if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//獲取到了,返回if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//默認topic:TBW102this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;} }2.選擇消息隊列
在rocketmq中,選擇消息隊列,大體上有兩種方案。根據sendLatencyFaultEnable(故障延遲機制)屬性值來判斷(默認為false)。如果sendLatencyFaultEnable = true:
2.1. 開啟了故障延遲機制下的MessageQueue選擇
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {//對應這一段代碼//大概意思就是根據MessageQueue的數量,round-robin負責均衡int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//如果正常的話,是走不到這里的。走到這里說明故障延遲機制下沒有可用的brokerName//這個時候就強行挑一個發送final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}return tpInfo.selectOneMessageQueue();}//見2.2.2的分析return tpInfo.selectOneMessageQueue(lastBrokerName); }故障延遲機制見故障延遲機制
2.2. 默認機制(未開啟故障延遲機制)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {//這條消息是否是首次發送,如果不是的話,則lastBrokerName就是上次發送失敗的broker的nameif (lastBrokerName == null) {//如果這條消息是第一次發送,則直接用一種round-robin方式挑選MessageQueuereturn selectOneMessageQueue();} else {//如果消息不是第一次發送,則本次挑選MessageQueue的時候盡量避免上次失敗的brokerint index = this.sendWhichQueue.getAndIncrement();for (int i = 0; i < this.messageQueueList.size(); i++) {int pos = Math.abs(index++) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}//如果就一個broker,那避免不了,就硬著頭皮隨機選一個return selectOneMessageQueue();} }3.發送消息
發送消息的入口方法:DefaultMQProducerImpl.sendKernelImpl()
private SendResult sendKernelImpl( Message msg, MessageQueue mq, CommunicationMode communicationMode,SendCallback sendCallback, TopicPublishInfo topicPublishInfo, long timeout){long beginStartTime = System.currentTimeMillis();//根據topic的路由信息拿到broker的地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;//有個是否啟用vip通道的配置項,如果開啟了,則會使用broker的另一個端口發送消息brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();//設置一個唯一的idif (!(msg instanceof MessageBatch)) {//給消息加上一個屬性UNIQ_KEYMessageClientIDSetter.setUniqID(msg);}//namespace,還沒搞懂boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//根據消息長度看看是不是要壓縮一下if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//判斷是否是事務消息final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();//設置各種屬性checkForbiddenContext.setNameSrvAddr/Group/CommunicationMode/BrokerAddr/Message/Mq/UnitMode..();this.executeCheckForbiddenHook(checkForbiddenContext);}//SendMessageHook是一個用于自定義在消息發送前后做一些自定義處理的接口if (this.hasSendMessageHook()) {context = new SendMessageContext();//設置各種屬性context.setProducer/ProducerGroup/CommunicationMode ..();String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}//判斷是否是延時消息if (msg.getProperty("__STARTDELIVERTIME") != null ||msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null){context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//構建SendMessageRequestHeaderSendMessageRequestHeader requestHeader = new SendMessageRequestHeader();//省略設置各種屬性的代碼if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC://異步發送消息Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;case ONEWAY:case SYNC://同步或者oneway發送消息long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(..);break;default:assert false;break;}//SendMessageHook發送消息后的回調if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult; }4.異常處理機制
可以看到,當發送消息出現異常時,都有一句:this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false)。具體分析請看故障延遲機制
批量發送消息
DefaultMQProducer.send(Collection msgs)方法定義如下:
public SendResult send(Collection<Message> msgs){//主要邏輯就是batch()方法return this.defaultMQProducerImpl.send(batch(msgs)); }batch()方法的實現:
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {//將批量的消息轉化為一個MessageBatch對象MessageBatch msgBatc = MessageBatch.generateFromList(msgs);for (Message message : msgBatch) {//為每一條單獨的message設置uniq key、topicValidators.checkMessage(message, this);MessageClientIDSetter.setUniqID(message);message.setTopic(withNamespace(message.getTopic()));}//批量消息跟普通消息的發送沒有啥差別,只是將消息序列化成字節數組的時候有點不一樣msgBatch.setBody(msgBatch.encode());msgBatch.setTopic(withNamespace(msgBatch.getTopic()));return msgBatch; }序列化MessageBatch的過程:
public static byte[] encodeMessages(List<Message> messages) {//TO DO refactor, accumulate in one buffer, avoid copiesList<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());int allSize = 0;for (Message message : messages) {//按照固定的格式序列化每一條messagebyte[] tmp = encodeMessage(message);encodedMessages.add(tmp);allSize += tmp.length;}byte[] allBytes = new byte[allSize];int pos = 0;for (byte[] bytes : encodedMessages) {System.arraycopy(bytes, 0, allBytes, pos, bytes.length);pos += bytes.length;}return allBytes; }單獨一條消息的序列化過程:
public static byte[] encodeMessage(Message message) {byte[] body = message.getBody();int bodyLen = body.length;String properties = messageProperties2String(message.getProperties());byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);//note properties length must not more than Short.MAXshort propertiesLength = (short) propertiesBytes.length;int sysFlag = message.getFlag();int storeSize = 4 // 1 TOTALSIZE+ 4 // 2 MAGICCOD+ 4 // 3 BODYCRC+ 4 // 4 FLAG+ 4 + bodyLen // 4 BODY+ 2 + propertiesLength;ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);// 1 TOTALSIZEbyteBuffer.putInt(storeSize);// 2 MAGICCODEbyteBuffer.putInt(0);// 3 BODYCRCbyteBuffer.putInt(0);// 4 FLAGint flag = message.getFlag();byteBuffer.putInt(flag);// 5 BODYbyteBuffer.putInt(bodyLen);byteBuffer.put(body);// 6 propertiesbyteBuffer.putShort(propertiesLength);byteBuffer.put(propertiesBytes);return byteBuffer.array(); }結語
(參考丁威、周繼峰<<RocketMQ技術內幕>>。水平有限,最近在看rocketmq源碼,記錄學習過程,也希望對各位有點微小的幫助。如有錯誤,請指正~)
總結
以上是生活随笔為你收集整理的rocketmq源码分析 -生产者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: openmv集成应用_OpenMV简介
- 下一篇: itools苹果录屏大师_苹果怎么录屏?