日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

zookeeper源码分析之四服务端(单机)处理请求流程

發(fā)布時(shí)間:2025/4/5 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 zookeeper源码分析之四服务端(单机)处理请求流程 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

上文:

zookeeper源碼分析之一服務(wù)端啟動(dòng)過(guò)程

中,我們介紹了zookeeper服務(wù)器的啟動(dòng)過(guò)程,其中單機(jī)是ZookeeperServer啟動(dòng),集群使用QuorumPeer啟動(dòng),那么這次我們分析各自一下消息處理過(guò)程:

? 前文可以看到在

1.在單機(jī)情況下NettyServerCnxnFactory中啟動(dòng)ZookeeperServer來(lái)處理消息:

public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker(); setupRequestProcessors();registerJMX();state = State.RUNNING;notifyAll();}

消息處理器的調(diào)用如下:

protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);((PrepRequestProcessor)firstProcessor).start();}

我們看到啟動(dòng)兩個(gè)消息處理器來(lái)處理請(qǐng)求:第一個(gè)同步消息處理器預(yù)消息服務(wù)器,最后一個(gè)同步請(qǐng)求處理器和異步請(qǐng)求處理器。

  1.1 ?第一個(gè)消息服務(wù)器處理器預(yù)消息服務(wù)器PrepRequestProcessor

  

@Overridepublic void run() {try {while (true) {Request request = submittedRequests.take();long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'P', request, "");}if (Request.requestOfDeath == request) {break;} pRequest(request);}} catch (RequestProcessorException e) {if (e.getCause() instanceof XidRolloverException) {LOG.info(e.getCause().getMessage());}handleException(this.getName(), e);} catch (Exception e) {handleException(this.getName(), e);}LOG.info("PrepRequestProcessor exited loop!");}

?可以看到,while(true)是一個(gè)一直循環(huán)處理的過(guò)程,其中紅色的部分為處理的主體。

/*** This method will be called inside the ProcessRequestThread, which is a* singleton, so there will be a single thread calling this code.** @param request*/protected void pRequest(Request request) throws RequestProcessorException {// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +// request.type + " id = 0x" + Long.toHexString(request.sessionId));request.setHdr(null);request.setTxn(null);try {switch (request.type) {case OpCode.createContainer:case OpCode.create:case OpCode.create2: CreateRequest create2Request = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);break;case OpCode.deleteContainer:case OpCode.delete:DeleteRequest deleteRequest = new DeleteRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);break;case OpCode.setData:SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);break;case OpCode.reconfig:ReconfigRequest reconfigRequest = new ReconfigRequest();ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);break;case OpCode.setACL:SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);break;case OpCode.check:CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);break;case OpCode.multi:MultiTransactionRecord multiRequest = new MultiTransactionRecord();try {ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);} catch(IOException e) {request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),Time.currentWallTime(), OpCode.multi));throw e;}List<Txn> txns = new ArrayList<Txn>();//Each op in a multi-op must have the same zxid!long zxid = zks.getNextZxid();KeeperException ke = null;//Store off current pending change records in case we need to rollbackMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);for(Op op: multiRequest) {Record subrequest = op.toRequestRecord();int type;Record txn;/* If we've already failed one of the ops, don't bother* trying the rest as we know it's going to fail and it* would be confusing in the logfiles.*/if (ke != null) {type = OpCode.error;txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());}/* Prep the request and convert to a Txn */else {try {pRequest2Txn(op.getType(), zxid, request, subrequest, false);type = request.getHdr().getType();txn = request.getTxn();} catch (KeeperException e) {ke = e;type = OpCode.error;txn = new ErrorTxn(e.code().intValue());LOG.info("Got user-level KeeperException when processing "+ request.toString() + " aborting remaining multi ops."+ " Error Path:" + e.getPath()+ " Error:" + e.getMessage());request.setException(e);/* Rollback change records from failed multi-op */rollbackPendingChanges(zxid, pendingChanges);}}//FIXME: I don't want to have to serialize it here and then// immediately deserialize in next processor. But I'm// not sure how else to get the txn stored into our list.ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);txn.serialize(boa, "request") ;ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());txns.add(new Txn(type, bb.array()));}request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), request.type));request.setTxn(new MultiTxn(txns));break;//create/close session don't require request recordcase OpCode.createSession:case OpCode.closeSession:if (!request.isLocalSession()) {pRequest2Txn(request.type, zks.getNextZxid(), request,null, true);}break;//All the rest don't need to create a Txn - just verify sessioncase OpCode.sync:case OpCode.exists:case OpCode.getData:case OpCode.getACL:case OpCode.getChildren:case OpCode.getChildren2:case OpCode.ping:case OpCode.setWatches:case OpCode.checkWatches:case OpCode.removeWatches:zks.sessionTracker.checkSession(request.sessionId,request.getOwner());break;default:LOG.warn("unknown type " + request.type);break;}} catch (KeeperException e) {if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(e.code().intValue()));}LOG.info("Got user-level KeeperException when processing "+ request.toString()+ " Error Path:" + e.getPath()+ " Error:" + e.getMessage());request.setException(e);} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;if(bb != null){bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}} else {sb.append("request buffer is null");}LOG.error("Dumping request buffer: 0x" + sb.toString());if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));}}request.zxid = zks.getZxid();nextProcessor.processRequest(request);}

排除異常的邏輯,該方法是處理不同類型的request,根據(jù)type選擇一個(gè)處理分支,ProcessRequestThread內(nèi)部調(diào)用該方法,它是單例的,因此只有一個(gè)單線程調(diào)用此代碼。以create請(qǐng)求為例(紅色部分),了解工作機(jī)制:

CreateRequest createRequest = (CreateRequest)record;if (deserialize) {ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);}CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());validateCreateRequest(createMode, request);String path = createRequest.getPath();String parentPath = validatePathForCreate(path, request.sessionId);List<ACL> listACL = fixupACL(path, request.authInfo, createRequest.getAcl());ChangeRecord parentRecord = getRecordForPath(parentPath);checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);int parentCVersion = parentRecord.stat.getCversion();if (createMode.isSequential()) {path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);}validatePath(path, request.sessionId);try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException(path);}} catch (KeeperException.NoNodeException e) {// ignore this one }boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) &&(parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER);if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException(path);}int newCversion = parentRecord.stat.getCversion()+1;if (type == OpCode.createContainer) {request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion));} else {request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),newCversion));}StatPersisted s = new StatPersisted();if (createMode.isEphemeral()) {s.setEphemeralOwner(request.sessionId);}parentRecord = parentRecord.duplicate(request.getHdr().getZxid());parentRecord.childCount++;parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord);addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));break;

調(diào)用方法,處理變化:

private void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}}

繼續(xù)向下

private void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}}

其中:outstandingChanges 是一組ChangeRecord,outstandingChangesForPath是map的ChangeRecord,如下定義:

final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();

ChangeRecord是一個(gè)數(shù)據(jù)結(jié)構(gòu),方便PrepRP和FinalRp共享信息。

ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,List<ACL> acl) {this.zxid = zxid;this.path = path;this.stat = stat;this.childCount = childCount;this.acl = acl;}

?

  1.2?先看一下同步請(qǐng)求處理器FinalRequestProcessor,這個(gè)請(qǐng)求處理器實(shí)際上應(yīng)用到一個(gè)請(qǐng)求的所有事務(wù),針對(duì)任何查詢提供服務(wù)。它通常處于請(qǐng)求處理的最后(不會(huì)有下一個(gè)消息處理器),故此得名。 它是如何處理請(qǐng)求呢?

public void processRequest(Request request) {if (LOG.isDebugEnabled()) {LOG.debug("Processing request:: " + request);}// request.addRQRec(">final");long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'E', request, "");}ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) {// Need to process local session requestsrc = zks.processTxn(request);// request.hdr is set for write requests, which are the only ones// that add to outstandingChanges.if (request.getHdr() != null) {TxnHeader hdr = request.getHdr();Record txn = request.getTxn();long zxid = hdr.getZxid();while (!zks.outstandingChanges.isEmpty()&& zks.outstandingChanges.get(0).zxid <= zxid) {ChangeRecord cr = zks.outstandingChanges.remove(0);if (cr.zxid < zxid) {LOG.warn("Zxid outstanding " + cr.zxid+ " is less than current " + zxid);}if (zks.outstandingChangesForPath.get(cr.path) == cr) {zks.outstandingChangesForPath.remove(cr.path);}}}// do not add non quorum packets to the queue. if (request.isQuorum()) {zks.getZKDatabase().addCommittedProposal(request);}}// ZOOKEEPER-558:// In some cases the server does not close the connection (e.g., closeconn buffer// was not being queued — ZOOKEEPER-558) properly. This happens, for example,// when the client closes the connection. The server should still close the session, though.// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.if (request.type == OpCode.closeSession && connClosedByClient(request)) {// We need to check if we can close the session id.// Sometimes the corresponding ServerCnxnFactory could be null because// we are just playing diffs from the leader.if (closeSession(zks.serverCnxnFactory, request.sessionId) ||closeSession(zks.secureServerCnxnFactory, request.sessionId)) {return;}}if (request.cnxn == null) {return;}ServerCnxn cnxn = request.cnxn;String lastOp = "NA";zks.decInProcess();Code err = Code.OK;Record rsp = null;try {if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {/** When local session upgrading is disabled, leader will* reject the ephemeral node creation due to session expire.* However, if this is the follower that issue the request,* it will have the correct error code, so we should use that* and report to user*/if (request.getException() != null) {throw request.getException();} else {throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));}}KeeperException ke = request.getException();if (ke != null && request.type != OpCode.multi) {throw ke;}if (LOG.isDebugEnabled()) {LOG.debug("{}",request);}switch (request.type) {case OpCode.ping: {zks.serverStats().updateLatency(request.createTime);lastOp = "PING";cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, Time.currentElapsedTime());cnxn.sendResponse(new ReplyHeader(-2,zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");return;}case OpCode.createSession: {zks.serverStats().updateLatency(request.createTime);lastOp = "SESS";cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, Time.currentElapsedTime());zks.finishSessionInit(request.cnxn, true);return;}case OpCode.multi: {lastOp = "MULT";rsp = new MultiResponse() ;for (ProcessTxnResult subTxnResult : rc.multiResult) {OpResult subResult ;switch (subTxnResult.type) {case OpCode.check:subResult = new CheckResult();break;case OpCode.create:subResult = new CreateResult(subTxnResult.path);break;case OpCode.create2:case OpCode.createContainer:subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);break;case OpCode.delete:case OpCode.deleteContainer:subResult = new DeleteResult();break;case OpCode.setData:subResult = new SetDataResult(subTxnResult.stat);break;case OpCode.error:subResult = new ErrorResult(subTxnResult.err) ;break;default:throw new IOException("Invalid type of op");}((MultiResponse)rsp).add(subResult);}break;}case OpCode.create: {lastOp = "CREA";rsp = new CreateResponse(rc.path);err = Code.get(rc.err);break;}case OpCode.create2:case OpCode.createContainer: {lastOp = "CREA";rsp = new Create2Response(rc.path, rc.stat);err = Code.get(rc.err);break;}case OpCode.delete:case OpCode.deleteContainer: {lastOp = "DELE";err = Code.get(rc.err);break;}case OpCode.setData: {lastOp = "SETD";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;} case OpCode.reconfig: {lastOp = "RECO"; rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);err = Code.get(rc.err);break;}case OpCode.setACL: {lastOp = "SETA";rsp = new SetACLResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.closeSession: {lastOp = "CLOS";err = Code.get(rc.err);break;}case OpCode.sync: {lastOp = "SYNC";SyncRequest syncRequest = new SyncRequest();ByteBufferInputStream.byteBuffer2Record(request.request,syncRequest);rsp = new SyncResponse(syncRequest.getPath());break;}case OpCode.check: {lastOp = "CHEC";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.exists: {lastOp = "EXIS";// TODO we need to figure out the security requirement for this!ExistsRequest existsRequest = new ExistsRequest();ByteBufferInputStream.byteBuffer2Record(request.request,existsRequest);String path = existsRequest.getPath();if (path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);rsp = new ExistsResponse(stat);break;}case OpCode.getData: {lastOp = "GETD";GetDataRequest getDataRequest = new GetDataRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclL;synchronized(n) {aclL = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),ZooDefs.Perms.READ,request.authInfo);Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);break;}case OpCode.setWatches: {lastOp = "SETW";SetWatches setWatches = new SetWatches();// XXX We really should NOT need this!!!! request.request.rewind();ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);long relativeZxid = setWatches.getRelativeZxid();zks.getZKDatabase().setWatches(relativeZxid,setWatches.getDataWatches(),setWatches.getExistWatches(),setWatches.getChildWatches(), cnxn);break;}case OpCode.getACL: {lastOp = "GETA";GetACLRequest getACLRequest = new GetACLRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getACLRequest);Stat stat = new Stat();List<ACL> acl =zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);rsp = new GetACLResponse(acl, stat);break;}case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}case OpCode.getChildren2: {lastOp = "GETC";GetChildren2Request getChildren2Request = new GetChildren2Request();ByteBufferInputStream.byteBuffer2Record(request.request,getChildren2Request);Stat stat = new Stat();DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildren2Request.getPath(), stat, getChildren2Request.getWatch() ? cnxn : null);rsp = new GetChildren2Response(children, stat);break;}case OpCode.checkWatches: {lastOp = "CHKW";CheckWatchesRequest checkWatches = new CheckWatchesRequest();ByteBufferInputStream.byteBuffer2Record(request.request,checkWatches);WatcherType type = WatcherType.fromInt(checkWatches.getType());boolean containsWatcher = zks.getZKDatabase().containsWatcher(checkWatches.getPath(), type, cnxn);if (!containsWatcher) {String msg = String.format(Locale.ENGLISH, "%s (type: %s)",new Object[] { checkWatches.getPath(), type });throw new KeeperException.NoWatcherException(msg);}break;}case OpCode.removeWatches: {lastOp = "REMW";RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();ByteBufferInputStream.byteBuffer2Record(request.request,removeWatches);WatcherType type = WatcherType.fromInt(removeWatches.getType());boolean removed = zks.getZKDatabase().removeWatch(removeWatches.getPath(), type, cnxn);if (!removed) {String msg = String.format(Locale.ENGLISH, "%s (type: %s)",new Object[] { removeWatches.getPath(), type });throw new KeeperException.NoWatcherException(msg);}break;}}} catch (SessionMovedException e) {// session moved is a connection level error, we need to tear// down the connection otw ZOOKEEPER-710 might happen// ie client on slow follower starts to renew session, fails// before this completes, then tries the fast follower (leader)// and is successful, however the initial renew is then// successfully fwd/processed by the leader and as a result// the client and leader disagree on where the client is most// recently attached (and therefore invalid SESSION MOVED generated) cnxn.sendCloseSession();return;} catch (KeeperException e) {err = e.code();} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}LOG.error("Dumping request buffer: 0x" + sb.toString());err = Code.MARSHALLINGERROR;}long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr =new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,request.createTime, Time.currentElapsedTime());try {cnxn.sendResponse(hdr, rsp, "response");if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}} catch (IOException e) {LOG.error("FIXMSG",e);}}

?  第一步,根據(jù)共享的outstandingChanges,

先處理事務(wù)后處理session:

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,Record txn) {ProcessTxnResult rc;int opCode = request != null ? request.type : hdr.getType();long sessionId = request != null ? request.sessionId : hdr.getClientId();if (hdr != null) {rc = getZKDatabase().processTxn(hdr, txn);} else {rc = new ProcessTxnResult();}if (opCode == OpCode.createSession) {if (hdr != null && txn instanceof CreateSessionTxn) {CreateSessionTxn cst = (CreateSessionTxn) txn;sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());} else if (request != null && request.isLocalSession()) {request.request.rewind();int timeout = request.request.getInt();request.request.rewind();sessionTracker.addSession(request.sessionId, timeout);} else {LOG.warn("*****>>>>> Got "+ txn.getClass() + " "+ txn.toString());}} else if (opCode == OpCode.closeSession) {sessionTracker.removeSession(sessionId);}return rc;}

處理事務(wù),本地和數(shù)據(jù)庫(kù)的不同分支, DataTree創(chuàng)建節(jié)點(diǎn)

CreateTxn createTxn = (CreateTxn) txn;rc.path = createTxn.getPath();createNode(createTxn.getPath(),createTxn.getData(),createTxn.getAcl(),createTxn.getEphemeral() ? header.getClientId() : 0,createTxn.getParentCVersion(),header.getZxid(), header.getTime(), null);break;

新增一個(gè)節(jié)點(diǎn)的邏輯是:

/*** Add a new node to the DataTree.* @param path* Path for the new node.* @param data* Data to store in the node.* @param acl* Node acls* @param ephemeralOwner* the session id that owns this node. -1 indicates this is not* an ephemeral node.* @param zxid* Transaction ID* @param time* @param outputStat* A Stat object to store Stat output results into.* @throws NodeExistsException * @throws NoNodeException * @throws KeeperException*/public void createNode(final String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)throws KeeperException.NoNodeException,KeeperException.NodeExistsException {int lastSlash = path.lastIndexOf('/');String parentName = path.substring(0, lastSlash);String childName = path.substring(lastSlash + 1);StatPersisted stat = new StatPersisted();stat.setCtime(time);stat.setMtime(time);stat.setCzxid(zxid);stat.setMzxid(zxid);stat.setPzxid(zxid);stat.setVersion(0);stat.setAversion(0);stat.setEphemeralOwner(ephemeralOwner);DataNode parent = nodes.get(parentName);if (parent == null) {throw new KeeperException.NoNodeException();}synchronized (parent) {Set<String> children = parent.getChildren();if (children != null && children.contains(childName)) {throw new KeeperException.NodeExistsException();}if (parentCVersion == -1) {parentCVersion = parent.stat.getCversion();parentCVersion++;}parent.stat.setCversion(parentCVersion);parent.stat.setPzxid(zxid);Long longval = convertAcls(acl);DataNode child = new DataNode(data, longval, stat);parent.addChild(childName);nodes.put(path, child);if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) {containers.add(path);} else if (ephemeralOwner != 0) {HashSet<String> list = ephemerals.get(ephemeralOwner);if (list == null) {list = new HashSet<String>();ephemerals.put(ephemeralOwner, list);}synchronized (list) {list.add(path);}}if (outputStat != null) {child.copyStat(outputStat);}}// now check if its one of the zookeeper node childif (parentName.startsWith(quotaZookeeper)) {// now check if its the limit nodeif (Quotas.limitNode.equals(childName)) {// this is the limit node// get the parent and add it to the trie pTrie.addPath(parentName.substring(quotaZookeeper.length()));}if (Quotas.statNode.equals(childName)) {updateQuotaForPath(parentName.substring(quotaZookeeper.length()));}}// also check to update the quotas for this nodeString lastPrefix = getMaxPrefixWithQuota(path);if(lastPrefix != null) {// ok we have some match and need to updateupdateCount(lastPrefix, 1);updateBytes(lastPrefix, data == null ? 0 : data.length);} dataWatches.triggerWatch(path, Event.EventType.NodeCreated);childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,Event.EventType.NodeChildrenChanged);}

最后的邏輯是觸發(fā)創(chuàng)建節(jié)點(diǎn)和子節(jié)點(diǎn)改變事件。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;} w.process(e);}return watchers;}

WatcherManager調(diào)用定義的watcher進(jìn)行事件處理。

  1.3. 再看異步消息處理器SyncRequestProcessor

@Overridepublic void run() {try {int logCount = 0;// we do this in an attempt to ensure that not all of the servers// in the ensemble take a snapshot at the same timeint randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {// track the number of records written to the logif (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the log zks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else { snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {// optimization for read heavy workloads// iff this is a read, and there are no pending// flushes (writes), then just pass this to the next// processorif (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}LOG.info("SyncRequestProcessor exited!");}

?  異步處理日志和快照,啟動(dòng)ZooKeeperThread線程來(lái)生成快照。

public void takeSnapshot(){try {txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());} catch (IOException e) {LOG.error("Severe unrecoverable error, exiting", e);// This is a severe error that we cannot recover from,// so we need to exitSystem.exit(10);}}

FileTxnSnapLog是個(gè)工具類,幫助處理txtlog和snapshot。

/*** save the datatree and the sessions into a snapshot* @param dataTree the datatree to be serialized onto disk* @param sessionsWithTimeouts the sesssion timeouts to be* serialized onto disk* @throws IOException*/public void save(DataTree dataTree,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)throws IOException {long lastZxid = dataTree.lastProcessedZxid;File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),snapshotFile); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }

持久化為文件

/*** serialize the datatree and session into the file snapshot* @param dt the datatree to be serialized* @param sessions the sessions to be serialized* @param snapShot the file to store snapshot into*/public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)throws IOException {if (!close) {OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());//CheckedOutputStream cout = new CheckedOutputStream()OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);serialize(dt,sessions,oa, header);long val = crcOut.getChecksum().getValue();oa.writeLong(val, "val");oa.writeString("/", "path");sessOS.flush();crcOut.close();sessOS.close();}}

至此,整個(gè)流程已經(jīng)走完。

  2. 集群情況下

 集群情況和單機(jī)略有不同,集群中使用QuorumPeer來(lái)啟動(dòng)ServerCnxnFactory,綁定本地地址

@Overridepublic void start() {LOG.info("binding to port " + localAddress);parentChannel = bootstrap.bind(localAddress);}

限于篇幅,后面的邏輯將在下篇中詳細(xì)描述。

 

小結(jié)

  從上面的代碼流程中,我們可以看出服務(wù)器處理請(qǐng)求要么通過(guò)Noi要不通過(guò)框架Netty來(lái)處理請(qǐng)求,請(qǐng)求通過(guò)先通過(guò)PrepRequestProcessor接收請(qǐng)求,并進(jìn)行包裝,然后請(qǐng)求類型的不同,設(shè)置同享數(shù)據(jù);然后通過(guò)SyncRequestProcessor來(lái)序列化快照和事務(wù)日志,并根據(jù)命令類型改變db的內(nèi)容,在日志和快照沒(méi)有寫(xiě)入前不會(huì)進(jìn)行下一個(gè)消息處理器;最后調(diào)用FinalRequestProcessor來(lái)作為消息處理器的終結(jié)者,發(fā)送響應(yīng)消息,并觸發(fā)watcher的處理程序?。

?

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/5001244.html

總結(jié)

以上是生活随笔為你收集整理的zookeeper源码分析之四服务端(单机)处理请求流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产精品自拍视频 | 黄色网址哪里有 | 精品久久电影 | 午夜精品在线播放 | 97在线免费观看 | 亚洲经典视频在线观看 | 日韩高清久久 | 爱情岛亚洲论坛入口 | 午夜成人免费视频 | 无人在线观看的免费高清视频 | 午夜院线| 高清人妖shemale japan | 永久免费无码av网站在线观看 | av在线资源 | 美女视频黄频视频大全 | 一本无码aⅴ久久久国产 | 欧美毛片在线观看 | 美女脱了内裤喂我喝尿视频 | 日韩美女在线 | 国产精品成人国产乱一区 | wwwxxxx国产| av丁香| 精品视频成人 | 亚洲天堂一区在线观看 | 天天摸天天插 | 亚洲激情视频在线播放 | 中文字幕亚洲精品在线观看 | 蜜桃av网| 午夜精品无码一区二区三区 | 91九色中文 | 久久人人澡 | asian日本肉体pics | 91看毛片| 女同互舔视频 | 视频一区二区三区四区五区 | 成人毛片18女人毛片 | 免费一级suv好看的国产网站 | 国产白丝一区二区三区 | 男人的天堂一级片 | 日韩经典第一页 | 自拍偷拍福利视频 | 在线免费一区二区 | 中文字幕成人在线视频 | 欧美激情一二区 | 中文在线a√在线 | 国产日韩av在线 | 中国xxxx性xxxx产国 | 国内精品91| 国产免费一区二区三区最新6 | 操白虎逼 | 深夜视频一区二区 | 日韩你懂的 | 天天曰夜夜操 | 精品黑人一区二区三区国语馆 | 男人的天堂你懂的 | 大黑人交交护士xxxxhd | 国产精品jizz在线观看软件 | 女人的黄色片 | 国产精品国产精品国产专区蜜臀ah | 成人福利视频导航 | 成年人看的网站 | 国产精品8 | 粗口调教gay2022.com | 日韩在线精品 | 欧美三级午夜理伦三级老人 | 蜜臀人妻四季av一区二区不卡 | 男人阁久久 | 精品视频网站 | 日韩三级成人 | 五月深爱 | 国产伦理在线 | 外国黄色网 | 天堂网www在线 | 激情网站在线 | 天天都色| 亚洲永久免费av | 激情文学亚洲色图 | 91亚洲精品久久久蜜桃 | 影音先锋成人网 | 国产一区二区三区播放 | 欧美人人爽 | 大屁股白浆一区二区 | 青青草原亚洲视频 | 青青草综合在线 | 日韩 欧美 亚洲 国产 | 国产伦精品一区二区三区网站 | 91黑丝美女| 在线视频91| 久久美女视频 | 欧美精品韩国精品 | 91激情视频在线 | 韩国av免费在线 | 玩弄人妻少妇500系列 | 小泽玛丽亚在线观看 | 天堂网www. | 亚洲精品在线观看网站 | 黄色一级网 | 国产一区二区三区三州 | 先锋影音av资源在线观看 |