rocketmq 初探(四)
大家好,我是烤鴨:
????上一篇簡單介紹broker的初始化,這一篇介紹 NettyRequestProcessor 的實現(主要是broker里用到的)。
AdminBrokerProcessor、ClientManageProcessor、ConsumerManageProcessor、EndTransactionProcessor
NettyRequestProcessor
/*** Common remoting command processor*/ public interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();}先看下哪些是broker里用到的,從包名就能看出來(remoting包下的后邊看),接下來一個一個分析。
AdminBrokerProcessor (后臺發起的 CURD)
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);//...default:break;}return null; }ClientManageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.HEART_BEAT:// 心跳,針對consumer,如果心跳的信息有更新(group和subscribe),會updatereturn this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT:// 下線,針對 producer和consumer, producer和consumer 下線return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:// 通過注冊的filter校驗client的configreturn this.checkClientConfig(ctx, request);default:break;}return null; }ConsumerManageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.GET_CONSUMER_LIST_BY_GROUP:// 根據group獲取customer的listreturn this.getConsumerListByGroup(ctx, request);case RequestCode.UPDATE_CONSUMER_OFFSET:// 更新topic@group對應的queueId和offset(沒有就直接put,已存在的話如果要更新的offset<本地offset,記錄日志)return this.updateConsumerOffset(ctx, request);case RequestCode.QUERY_CONSUMER_OFFSET:// 根據topic@group、queueId獲取offset(如果offset<0,并且在磁盤沒有刷到內存,返回0,否則返回 QUERY_NOT_FOUND)return this.queryConsumerOffset(ctx, request);default:break;}return null; }EndTransactionProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throwsRemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final EndTransactionRequestHeader requestHeader =(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);LOGGER.debug("Transaction request:{}", requestHeader);// salve 不支持事務if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");return response;}// 來源于事務檢查(ClientRemotingProcessor.processRequest)if (requestHeader.getFromTransactionCheck()) {switch (requestHeader.getCommitOrRollback()) {// 非事務類型,直接返回case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, but it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}// commit, break 執行執行case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}} else {switch (requestHeader.getCommitOrRollback()) {case MessageSysFlag.TRANSACTION_NOT_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());return null;}case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {break;}case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."+ "RequestHeader: {} Remark: {}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.toString(),request.getRemark());break;}default:return null;}}OperationResult result = new OperationResult();// 提交if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {// 根據offset從commitlog中獲取當前的 halfmessage(半消息)result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {// 校驗半消息的合法性,group、事務隊列的offset、commitlog的offset是否一致RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {// 根據半消息構建一條事務消息MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);// 發送最終消息,消息刷到commitlogRemotingCommand sendResult = sendFinalMessage(msgInner);if (sendResult.getCode() == ResponseCode.SUCCESS) {// 最終消息發送成功,在commitlog里原msg打一個'd'的標記this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return sendResult;}return res;}// 回滾} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {// 流程同提交差不多,半小時獲取成功之后,在commitlog里原msg打一個'd'的標記result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);if (result.getResponseCode() == ResponseCode.SUCCESS) {RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);if (res.getCode() == ResponseCode.SUCCESS) {this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());}return res;}}response.setCode(result.getResponseCode());response.setRemark(result.getResponseRemark());return response; }ForwardRequestProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {return null; }小結
介紹了4個 processor:
AdminBrokerProcessor:給console臺提供的 curd接口
ClientManageProcessor:針對client端的請求,包含 心跳(上線)、下線、配置檢查
ConsumerManageProcessor:針對consumer的,包含 獲取consumer列表、更新queueId和offset、獲取offset
EndTransactionProcessor:提交模式,從commitlog根據offset獲取半消息,寫入commitlog,原半消息打一個’d’的標記。
回滾模式,從commitlog根據offset獲取半消息,原半消息打一個’d’的標記。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的rocketmq 初探(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2015/10/9 Python核编初级
- 下一篇: 孝敬老人用 盐城 淮剧视频 淮剧 王樵楼