日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

zookeeper源码分析之四服务端(单机)处理请求流程

發布時間:2025/4/5 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 zookeeper源码分析之四服务端(单机)处理请求流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上文:

zookeeper源碼分析之一服務端啟動過程

中,我們介紹了zookeeper服務器的啟動過程,其中單機是ZookeeperServer啟動,集群使用QuorumPeer啟動,那么這次我們分析各自一下消息處理過程:

? 前文可以看到在

1.在單機情況下NettyServerCnxnFactory中啟動ZookeeperServer來處理消息:

public synchronized void startup() {if (sessionTracker == null) {createSessionTracker();}startSessionTracker(); setupRequestProcessors();registerJMX();state = State.RUNNING;notifyAll();}

消息處理器的調用如下:

protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor syncProcessor = new SyncRequestProcessor(this,finalProcessor);((SyncRequestProcessor)syncProcessor).start();firstProcessor = new PrepRequestProcessor(this, syncProcessor);((PrepRequestProcessor)firstProcessor).start();}

我們看到啟動兩個消息處理器來處理請求:第一個同步消息處理器預消息服務器,最后一個同步請求處理器和異步請求處理器。

  1.1 ?第一個消息服務器處理器預消息服務器PrepRequestProcessor

  

@Overridepublic void run() {try {while (true) {Request request = submittedRequests.take();long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'P', request, "");}if (Request.requestOfDeath == request) {break;} pRequest(request);}} catch (RequestProcessorException e) {if (e.getCause() instanceof XidRolloverException) {LOG.info(e.getCause().getMessage());}handleException(this.getName(), e);} catch (Exception e) {handleException(this.getName(), e);}LOG.info("PrepRequestProcessor exited loop!");}

?可以看到,while(true)是一個一直循環處理的過程,其中紅色的部分為處理的主體。

/*** This method will be called inside the ProcessRequestThread, which is a* singleton, so there will be a single thread calling this code.** @param request*/protected void pRequest(Request request) throws RequestProcessorException {// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +// request.type + " id = 0x" + Long.toHexString(request.sessionId));request.setHdr(null);request.setTxn(null);try {switch (request.type) {case OpCode.createContainer:case OpCode.create:case OpCode.create2: CreateRequest create2Request = new CreateRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);break;case OpCode.deleteContainer:case OpCode.delete:DeleteRequest deleteRequest = new DeleteRequest();pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);break;case OpCode.setData:SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);break;case OpCode.reconfig:ReconfigRequest reconfigRequest = new ReconfigRequest();ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);break;case OpCode.setACL:SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);break;case OpCode.check:CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);break;case OpCode.multi:MultiTransactionRecord multiRequest = new MultiTransactionRecord();try {ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);} catch(IOException e) {request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),Time.currentWallTime(), OpCode.multi));throw e;}List<Txn> txns = new ArrayList<Txn>();//Each op in a multi-op must have the same zxid!long zxid = zks.getNextZxid();KeeperException ke = null;//Store off current pending change records in case we need to rollbackMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);for(Op op: multiRequest) {Record subrequest = op.toRequestRecord();int type;Record txn;/* If we've already failed one of the ops, don't bother* trying the rest as we know it's going to fail and it* would be confusing in the logfiles.*/if (ke != null) {type = OpCode.error;txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());}/* Prep the request and convert to a Txn */else {try {pRequest2Txn(op.getType(), zxid, request, subrequest, false);type = request.getHdr().getType();txn = request.getTxn();} catch (KeeperException e) {ke = e;type = OpCode.error;txn = new ErrorTxn(e.code().intValue());LOG.info("Got user-level KeeperException when processing "+ request.toString() + " aborting remaining multi ops."+ " Error Path:" + e.getPath()+ " Error:" + e.getMessage());request.setException(e);/* Rollback change records from failed multi-op */rollbackPendingChanges(zxid, pendingChanges);}}//FIXME: I don't want to have to serialize it here and then// immediately deserialize in next processor. But I'm// not sure how else to get the txn stored into our list.ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);txn.serialize(boa, "request") ;ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());txns.add(new Txn(type, bb.array()));}request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), request.type));request.setTxn(new MultiTxn(txns));break;//create/close session don't require request recordcase OpCode.createSession:case OpCode.closeSession:if (!request.isLocalSession()) {pRequest2Txn(request.type, zks.getNextZxid(), request,null, true);}break;//All the rest don't need to create a Txn - just verify sessioncase OpCode.sync:case OpCode.exists:case OpCode.getData:case OpCode.getACL:case OpCode.getChildren:case OpCode.getChildren2:case OpCode.ping:case OpCode.setWatches:case OpCode.checkWatches:case OpCode.removeWatches:zks.sessionTracker.checkSession(request.sessionId,request.getOwner());break;default:LOG.warn("unknown type " + request.type);break;}} catch (KeeperException e) {if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(e.code().intValue()));}LOG.info("Got user-level KeeperException when processing "+ request.toString()+ " Error Path:" + e.getPath()+ " Error:" + e.getMessage());request.setException(e);} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;if(bb != null){bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}} else {sb.append("request buffer is null");}LOG.error("Dumping request buffer: 0x" + sb.toString());if (request.getHdr() != null) {request.getHdr().setType(OpCode.error);request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));}}request.zxid = zks.getZxid();nextProcessor.processRequest(request);}

排除異常的邏輯,該方法是處理不同類型的request,根據type選擇一個處理分支,ProcessRequestThread內部調用該方法,它是單例的,因此只有一個單線程調用此代碼。以create請求為例(紅色部分),了解工作機制:

CreateRequest createRequest = (CreateRequest)record;if (deserialize) {ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);}CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());validateCreateRequest(createMode, request);String path = createRequest.getPath();String parentPath = validatePathForCreate(path, request.sessionId);List<ACL> listACL = fixupACL(path, request.authInfo, createRequest.getAcl());ChangeRecord parentRecord = getRecordForPath(parentPath);checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);int parentCVersion = parentRecord.stat.getCversion();if (createMode.isSequential()) {path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);}validatePath(path, request.sessionId);try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException(path);}} catch (KeeperException.NoNodeException e) {// ignore this one }boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) &&(parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER);if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException(path);}int newCversion = parentRecord.stat.getCversion()+1;if (type == OpCode.createContainer) {request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion));} else {request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),newCversion));}StatPersisted s = new StatPersisted();if (createMode.isEphemeral()) {s.setEphemeralOwner(request.sessionId);}parentRecord = parentRecord.duplicate(request.getHdr().getZxid());parentRecord.childCount++;parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord);addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));break;

調用方法,處理變化:

private void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}}

繼續向下

private void addChangeRecord(ChangeRecord c) {synchronized (zks.outstandingChanges) {zks.outstandingChanges.add(c);zks.outstandingChangesForPath.put(c.path, c);}}

其中:outstandingChanges 是一組ChangeRecord,outstandingChangesForPath是map的ChangeRecord,如下定義:

final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();

ChangeRecord是一個數據結構,方便PrepRP和FinalRp共享信息。

ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,List<ACL> acl) {this.zxid = zxid;this.path = path;this.stat = stat;this.childCount = childCount;this.acl = acl;}

?

  1.2?先看一下同步請求處理器FinalRequestProcessor,這個請求處理器實際上應用到一個請求的所有事務,針對任何查詢提供服務。它通常處于請求處理的最后(不會有下一個消息處理器),故此得名。 它是如何處理請求呢?

public void processRequest(Request request) {if (LOG.isDebugEnabled()) {LOG.debug("Processing request:: " + request);}// request.addRQRec(">final");long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;if (request.type == OpCode.ping) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logRequest(LOG, traceMask, 'E', request, "");}ProcessTxnResult rc = null;synchronized (zks.outstandingChanges) {// Need to process local session requestsrc = zks.processTxn(request);// request.hdr is set for write requests, which are the only ones// that add to outstandingChanges.if (request.getHdr() != null) {TxnHeader hdr = request.getHdr();Record txn = request.getTxn();long zxid = hdr.getZxid();while (!zks.outstandingChanges.isEmpty()&& zks.outstandingChanges.get(0).zxid <= zxid) {ChangeRecord cr = zks.outstandingChanges.remove(0);if (cr.zxid < zxid) {LOG.warn("Zxid outstanding " + cr.zxid+ " is less than current " + zxid);}if (zks.outstandingChangesForPath.get(cr.path) == cr) {zks.outstandingChangesForPath.remove(cr.path);}}}// do not add non quorum packets to the queue. if (request.isQuorum()) {zks.getZKDatabase().addCommittedProposal(request);}}// ZOOKEEPER-558:// In some cases the server does not close the connection (e.g., closeconn buffer// was not being queued — ZOOKEEPER-558) properly. This happens, for example,// when the client closes the connection. The server should still close the session, though.// Calling closeSession() after losing the cnxn, results in the client close session response being dropped.if (request.type == OpCode.closeSession && connClosedByClient(request)) {// We need to check if we can close the session id.// Sometimes the corresponding ServerCnxnFactory could be null because// we are just playing diffs from the leader.if (closeSession(zks.serverCnxnFactory, request.sessionId) ||closeSession(zks.secureServerCnxnFactory, request.sessionId)) {return;}}if (request.cnxn == null) {return;}ServerCnxn cnxn = request.cnxn;String lastOp = "NA";zks.decInProcess();Code err = Code.OK;Record rsp = null;try {if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {/** When local session upgrading is disabled, leader will* reject the ephemeral node creation due to session expire.* However, if this is the follower that issue the request,* it will have the correct error code, so we should use that* and report to user*/if (request.getException() != null) {throw request.getException();} else {throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr()));}}KeeperException ke = request.getException();if (ke != null && request.type != OpCode.multi) {throw ke;}if (LOG.isDebugEnabled()) {LOG.debug("{}",request);}switch (request.type) {case OpCode.ping: {zks.serverStats().updateLatency(request.createTime);lastOp = "PING";cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, Time.currentElapsedTime());cnxn.sendResponse(new ReplyHeader(-2,zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");return;}case OpCode.createSession: {zks.serverStats().updateLatency(request.createTime);lastOp = "SESS";cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,request.createTime, Time.currentElapsedTime());zks.finishSessionInit(request.cnxn, true);return;}case OpCode.multi: {lastOp = "MULT";rsp = new MultiResponse() ;for (ProcessTxnResult subTxnResult : rc.multiResult) {OpResult subResult ;switch (subTxnResult.type) {case OpCode.check:subResult = new CheckResult();break;case OpCode.create:subResult = new CreateResult(subTxnResult.path);break;case OpCode.create2:case OpCode.createContainer:subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);break;case OpCode.delete:case OpCode.deleteContainer:subResult = new DeleteResult();break;case OpCode.setData:subResult = new SetDataResult(subTxnResult.stat);break;case OpCode.error:subResult = new ErrorResult(subTxnResult.err) ;break;default:throw new IOException("Invalid type of op");}((MultiResponse)rsp).add(subResult);}break;}case OpCode.create: {lastOp = "CREA";rsp = new CreateResponse(rc.path);err = Code.get(rc.err);break;}case OpCode.create2:case OpCode.createContainer: {lastOp = "CREA";rsp = new Create2Response(rc.path, rc.stat);err = Code.get(rc.err);break;}case OpCode.delete:case OpCode.deleteContainer: {lastOp = "DELE";err = Code.get(rc.err);break;}case OpCode.setData: {lastOp = "SETD";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;} case OpCode.reconfig: {lastOp = "RECO"; rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);err = Code.get(rc.err);break;}case OpCode.setACL: {lastOp = "SETA";rsp = new SetACLResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.closeSession: {lastOp = "CLOS";err = Code.get(rc.err);break;}case OpCode.sync: {lastOp = "SYNC";SyncRequest syncRequest = new SyncRequest();ByteBufferInputStream.byteBuffer2Record(request.request,syncRequest);rsp = new SyncResponse(syncRequest.getPath());break;}case OpCode.check: {lastOp = "CHEC";rsp = new SetDataResponse(rc.stat);err = Code.get(rc.err);break;}case OpCode.exists: {lastOp = "EXIS";// TODO we need to figure out the security requirement for this!ExistsRequest existsRequest = new ExistsRequest();ByteBufferInputStream.byteBuffer2Record(request.request,existsRequest);String path = existsRequest.getPath();if (path.indexOf('\0') != -1) {throw new KeeperException.BadArgumentsException();}Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);rsp = new ExistsResponse(stat);break;}case OpCode.getData: {lastOp = "GETD";GetDataRequest getDataRequest = new GetDataRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getDataRequest);DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclL;synchronized(n) {aclL = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),ZooDefs.Perms.READ,request.authInfo);Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);break;}case OpCode.setWatches: {lastOp = "SETW";SetWatches setWatches = new SetWatches();// XXX We really should NOT need this!!!! request.request.rewind();ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);long relativeZxid = setWatches.getRelativeZxid();zks.getZKDatabase().setWatches(relativeZxid,setWatches.getDataWatches(),setWatches.getExistWatches(),setWatches.getChildWatches(), cnxn);break;}case OpCode.getACL: {lastOp = "GETA";GetACLRequest getACLRequest = new GetACLRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getACLRequest);Stat stat = new Stat();List<ACL> acl =zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);rsp = new GetACLResponse(acl, stat);break;}case OpCode.getChildren: {lastOp = "GETC";GetChildrenRequest getChildrenRequest = new GetChildrenRequest();ByteBufferInputStream.byteBuffer2Record(request.request,getChildrenRequest);DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);rsp = new GetChildrenResponse(children);break;}case OpCode.getChildren2: {lastOp = "GETC";GetChildren2Request getChildren2Request = new GetChildren2Request();ByteBufferInputStream.byteBuffer2Record(request.request,getChildren2Request);Stat stat = new Stat();DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());if (n == null) {throw new KeeperException.NoNodeException();}Long aclG;synchronized(n) {aclG = n.acl;}PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),ZooDefs.Perms.READ,request.authInfo);List<String> children = zks.getZKDatabase().getChildren(getChildren2Request.getPath(), stat, getChildren2Request.getWatch() ? cnxn : null);rsp = new GetChildren2Response(children, stat);break;}case OpCode.checkWatches: {lastOp = "CHKW";CheckWatchesRequest checkWatches = new CheckWatchesRequest();ByteBufferInputStream.byteBuffer2Record(request.request,checkWatches);WatcherType type = WatcherType.fromInt(checkWatches.getType());boolean containsWatcher = zks.getZKDatabase().containsWatcher(checkWatches.getPath(), type, cnxn);if (!containsWatcher) {String msg = String.format(Locale.ENGLISH, "%s (type: %s)",new Object[] { checkWatches.getPath(), type });throw new KeeperException.NoWatcherException(msg);}break;}case OpCode.removeWatches: {lastOp = "REMW";RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();ByteBufferInputStream.byteBuffer2Record(request.request,removeWatches);WatcherType type = WatcherType.fromInt(removeWatches.getType());boolean removed = zks.getZKDatabase().removeWatch(removeWatches.getPath(), type, cnxn);if (!removed) {String msg = String.format(Locale.ENGLISH, "%s (type: %s)",new Object[] { removeWatches.getPath(), type });throw new KeeperException.NoWatcherException(msg);}break;}}} catch (SessionMovedException e) {// session moved is a connection level error, we need to tear// down the connection otw ZOOKEEPER-710 might happen// ie client on slow follower starts to renew session, fails// before this completes, then tries the fast follower (leader)// and is successful, however the initial renew is then// successfully fwd/processed by the leader and as a result// the client and leader disagree on where the client is most// recently attached (and therefore invalid SESSION MOVED generated) cnxn.sendCloseSession();return;} catch (KeeperException e) {err = e.code();} catch (Exception e) {// log at error level as we are returning a marshalling// error to the userLOG.error("Failed to process " + request, e);StringBuilder sb = new StringBuilder();ByteBuffer bb = request.request;bb.rewind();while (bb.hasRemaining()) {sb.append(Integer.toHexString(bb.get() & 0xff));}LOG.error("Dumping request buffer: 0x" + sb.toString());err = Code.MARSHALLINGERROR;}long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();ReplyHeader hdr =new ReplyHeader(request.cxid, lastZxid, err.intValue());zks.serverStats().updateLatency(request.createTime);cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,request.createTime, Time.currentElapsedTime());try {cnxn.sendResponse(hdr, rsp, "response");if (request.type == OpCode.closeSession) {cnxn.sendCloseSession();}} catch (IOException e) {LOG.error("FIXMSG",e);}}

?  第一步,根據共享的outstandingChanges,

先處理事務后處理session:

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,Record txn) {ProcessTxnResult rc;int opCode = request != null ? request.type : hdr.getType();long sessionId = request != null ? request.sessionId : hdr.getClientId();if (hdr != null) {rc = getZKDatabase().processTxn(hdr, txn);} else {rc = new ProcessTxnResult();}if (opCode == OpCode.createSession) {if (hdr != null && txn instanceof CreateSessionTxn) {CreateSessionTxn cst = (CreateSessionTxn) txn;sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());} else if (request != null && request.isLocalSession()) {request.request.rewind();int timeout = request.request.getInt();request.request.rewind();sessionTracker.addSession(request.sessionId, timeout);} else {LOG.warn("*****>>>>> Got "+ txn.getClass() + " "+ txn.toString());}} else if (opCode == OpCode.closeSession) {sessionTracker.removeSession(sessionId);}return rc;}

處理事務,本地和數據庫的不同分支, DataTree創建節點

CreateTxn createTxn = (CreateTxn) txn;rc.path = createTxn.getPath();createNode(createTxn.getPath(),createTxn.getData(),createTxn.getAcl(),createTxn.getEphemeral() ? header.getClientId() : 0,createTxn.getParentCVersion(),header.getZxid(), header.getTime(), null);break;

新增一個節點的邏輯是:

/*** Add a new node to the DataTree.* @param path* Path for the new node.* @param data* Data to store in the node.* @param acl* Node acls* @param ephemeralOwner* the session id that owns this node. -1 indicates this is not* an ephemeral node.* @param zxid* Transaction ID* @param time* @param outputStat* A Stat object to store Stat output results into.* @throws NodeExistsException * @throws NoNodeException * @throws KeeperException*/public void createNode(final String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)throws KeeperException.NoNodeException,KeeperException.NodeExistsException {int lastSlash = path.lastIndexOf('/');String parentName = path.substring(0, lastSlash);String childName = path.substring(lastSlash + 1);StatPersisted stat = new StatPersisted();stat.setCtime(time);stat.setMtime(time);stat.setCzxid(zxid);stat.setMzxid(zxid);stat.setPzxid(zxid);stat.setVersion(0);stat.setAversion(0);stat.setEphemeralOwner(ephemeralOwner);DataNode parent = nodes.get(parentName);if (parent == null) {throw new KeeperException.NoNodeException();}synchronized (parent) {Set<String> children = parent.getChildren();if (children != null && children.contains(childName)) {throw new KeeperException.NodeExistsException();}if (parentCVersion == -1) {parentCVersion = parent.stat.getCversion();parentCVersion++;}parent.stat.setCversion(parentCVersion);parent.stat.setPzxid(zxid);Long longval = convertAcls(acl);DataNode child = new DataNode(data, longval, stat);parent.addChild(childName);nodes.put(path, child);if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) {containers.add(path);} else if (ephemeralOwner != 0) {HashSet<String> list = ephemerals.get(ephemeralOwner);if (list == null) {list = new HashSet<String>();ephemerals.put(ephemeralOwner, list);}synchronized (list) {list.add(path);}}if (outputStat != null) {child.copyStat(outputStat);}}// now check if its one of the zookeeper node childif (parentName.startsWith(quotaZookeeper)) {// now check if its the limit nodeif (Quotas.limitNode.equals(childName)) {// this is the limit node// get the parent and add it to the trie pTrie.addPath(parentName.substring(quotaZookeeper.length()));}if (Quotas.statNode.equals(childName)) {updateQuotaForPath(parentName.substring(quotaZookeeper.length()));}}// also check to update the quotas for this nodeString lastPrefix = getMaxPrefixWithQuota(path);if(lastPrefix != null) {// ok we have some match and need to updateupdateCount(lastPrefix, 1);updateBytes(lastPrefix, data == null ? 0 : data.length);} dataWatches.triggerWatch(path, Event.EventType.NodeCreated);childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,Event.EventType.NodeChildrenChanged);}

最后的邏輯是觸發創建節點和子節點改變事件。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.EVENT_DELIVERY_TRACE_MASK,"No watchers for " + path);}return null;}for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;} w.process(e);}return watchers;}

WatcherManager調用定義的watcher進行事件處理。

  1.3. 再看異步消息處理器SyncRequestProcessor

@Overridepublic void run() {try {int logCount = 0;// we do this in an attempt to ensure that not all of the servers// in the ensemble take a snapshot at the same timeint randRoll = r.nextInt(snapCount/2);while (true) {Request si = null;if (toFlush.isEmpty()) {si = queuedRequests.take();} else {si = queuedRequests.poll();if (si == null) {flush(toFlush);continue;}}if (si == requestOfDeath) {break;}if (si != null) {// track the number of records written to the logif (zks.getZKDatabase().append(si)) {logCount++;if (logCount > (snapCount / 2 + randRoll)) {randRoll = r.nextInt(snapCount/2);// roll the log zks.getZKDatabase().rollLog();// take a snapshotif (snapInProcess != null && snapInProcess.isAlive()) {LOG.warn("Too busy to snap, skipping");} else { snapInProcess = new ZooKeeperThread("Snapshot Thread") {public void run() {try {zks.takeSnapshot();} catch(Exception e) {LOG.warn("Unexpected exception", e);}}};snapInProcess.start();}logCount = 0;}} else if (toFlush.isEmpty()) {// optimization for read heavy workloads// iff this is a read, and there are no pending// flushes (writes), then just pass this to the next// processorif (nextProcessor != null) {nextProcessor.processRequest(si);if (nextProcessor instanceof Flushable) {((Flushable)nextProcessor).flush();}}continue;}toFlush.add(si);if (toFlush.size() > 1000) {flush(toFlush);}}}} catch (Throwable t) {handleException(this.getName(), t);} finally{running = false;}LOG.info("SyncRequestProcessor exited!");}

?  異步處理日志和快照,啟動ZooKeeperThread線程來生成快照。

public void takeSnapshot(){try {txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());} catch (IOException e) {LOG.error("Severe unrecoverable error, exiting", e);// This is a severe error that we cannot recover from,// so we need to exitSystem.exit(10);}}

FileTxnSnapLog是個工具類,幫助處理txtlog和snapshot。

/*** save the datatree and the sessions into a snapshot* @param dataTree the datatree to be serialized onto disk* @param sessionsWithTimeouts the sesssion timeouts to be* serialized onto disk* @throws IOException*/public void save(DataTree dataTree,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)throws IOException {long lastZxid = dataTree.lastProcessedZxid;File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),snapshotFile); snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }

持久化為文件

/*** serialize the datatree and session into the file snapshot* @param dt the datatree to be serialized* @param sessions the sessions to be serialized* @param snapShot the file to store snapshot into*/public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)throws IOException {if (!close) {OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());//CheckedOutputStream cout = new CheckedOutputStream()OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);serialize(dt,sessions,oa, header);long val = crcOut.getChecksum().getValue();oa.writeLong(val, "val");oa.writeString("/", "path");sessOS.flush();crcOut.close();sessOS.close();}}

至此,整個流程已經走完。

  2. 集群情況下

 集群情況和單機略有不同,集群中使用QuorumPeer來啟動ServerCnxnFactory,綁定本地地址

@Overridepublic void start() {LOG.info("binding to port " + localAddress);parentChannel = bootstrap.bind(localAddress);}

限于篇幅,后面的邏輯將在下篇中詳細描述。

 

小結

  從上面的代碼流程中,我們可以看出服務器處理請求要么通過Noi要不通過框架Netty來處理請求,請求通過先通過PrepRequestProcessor接收請求,并進行包裝,然后請求類型的不同,設置同享數據;然后通過SyncRequestProcessor來序列化快照和事務日志,并根據命令類型改變db的內容,在日志和快照沒有寫入前不會進行下一個消息處理器;最后調用FinalRequestProcessor來作為消息處理器的終結者,發送響應消息,并觸發watcher的處理程序?。

?

轉載于:https://www.cnblogs.com/davidwang456/p/5001244.html

總結

以上是生活随笔為你收集整理的zookeeper源码分析之四服务端(单机)处理请求流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

在线观看v片 | 国产在线看 | av在线电影播放 | 国产中文自拍 | 人人爽人人搞 | 免费一级片在线 | 一级片黄色片网站 | 国产午夜精品久久久久久久久久 | 久人人 | 久久免费看a级毛毛片 | 色老板在线 | 国产黄色精品在线观看 | 亚洲毛片一区二区三区 | 国产精品永久久久久久久www | 国产精品网在线观看 | 久久999精品 | 国产91影视| 国产视频精品久久 | 亚洲精品一区二区三区在线观看 | 激情开心色 | 成人在线免费小视频 | a v在线观看 | 欧美日韩免费一区二区三区 | 色婷婷福利视频 | 日本久久久精品视频 | 亚洲精品乱码久久 | 亚洲精品免费播放 | 日韩色综合网 | 久久特级毛片 | 色综合久久悠悠 | 日韩av免费在线电影 | 久久精精品视频 | 黄色在线看网站 | 久爱精品在线 | 91精品在线视频观看 | 国产一级视屏 | 天天弄天天干 | 亚洲精品视频在线 | 精品国精品自拍自在线 | 精品国产日本 | 视频一区二区免费 | 国产精品久久99精品毛片三a | 欧美日韩国产精品爽爽 | 久久久久久福利 | 国产精品 中文字幕 亚洲 欧美 | 91视频亚洲 | 91人人网 | 亚洲国产精品传媒在线观看 | 天堂在线一区二区 | 在线观看电影av | 日日夜夜天天干 | 午夜精品久久久久久久99婷婷 | 日韩成人黄色av | 五月天色综合 | 你操综合 | 中文字幕在线免费看线人 | www成人av | 婷婷丁香在线 | 免费观看www视频 | 成人a级网站 | 丰满少妇在线观看 | 亚洲精品videossex少妇 | 国产资源中文字幕 | 在线观看完整版免费 | 色七七亚洲影院 | 亚洲国产网站 | 亚洲高清在线 | 91人人爽久久涩噜噜噜 | www.色午夜.com | 91成人在线观看高潮 | 久久久国产网站 | 国产成人在线精品 | 日日夜夜精品免费观看 | 日韩精品免费在线视频 | 国产精品丝袜久久久久久久不卡 | 天天爱天天操天天爽 | 97在线观看免费 | 97色婷婷| 国产成人精品一区二区三区在线观看 | 精品综合久久 | 久久视频精品在线观看 | 欧美日韩国产成人 | 9在线观看免费高清完整版在线观看明 | 五月在线 | 午夜精品视频在线 | 国产自产高清不卡 | 久久九九免费视频 | 国产成人黄色网址 | 日韩免费在线视频 | 超碰97免费观看 | 激情欧美丁香 | 免费观看性生活大片3 | 91av手机在线 | 天天插伊人| 成人一区二区在线 | 青草视频在线 | 日本精品一区二区在线观看 | 免费高清男女打扑克视频 | 欧美a√大片 | 中文字幕在线免费看 | 一级黄色片在线免费观看 | 亚洲区色| 激情在线网站 | 成年人免费观看国产 | 你操综合 | 91成年人在线观看 | 国产剧情在线一区 | 91视频这里只有精品 | 精品国产免费一区二区三区五区 | 成人黄色小说在线观看 | 国产高清视频网 | 91成人天堂久久成人 | 欧美日在线观看 | 日韩免费高清 | 又黄又爽又刺激 | 欧美精品久久久久久久久久 | 亚洲日本va午夜在线电影 | 伊色综合久久之综合久久 | 免费看黄20分钟 | www.伊人网 | 日本久久久久久久久久久 | 人人干97| 午夜三级影院 | 色插综合 | 国产精品国产三级国产 | 国产精品11| 日韩三级久久 | www.亚洲| 精品一区二区在线观看 | 久草免费在线观看视频 | 欧美日韩精品影院 | 天天插夜夜操 | 夜夜骑日日操 | 久久久久女人精品毛片九一 | 久久精品欧美日韩精品 | 欧美精品乱码久久久久久按摩 | 日本h视频在线观看 | 国产成人精品午夜在线播放 | 精品国产欧美 | 国产在线专区 | 99精品成人 | 黄网站污 | 亚洲综合色婷婷 | 97电院网手机版 | 午夜视频色 | av看片在线 | 久艹在线免费观看 | 婷婷 综合 色 | 国产区第一页 | 成年人在线电影 | 9在线观看免费 | 欧美一级视频在线观看 | 在线观看国产亚洲 | 毛片网站在线看 | 久久精品国产第一区二区三区 | 美女在线免费观看视频 | 成人中文字幕在线观看 | 国产精品久久久久一区二区国产 | 久久综合五月天婷婷伊人 | 午夜成人免费影院 | 午夜久久久久 | 日韩精品久久久 | 亚洲精品视频一 | 日韩黄色一区 | 国产第页 | 亚洲精品乱码久久久久v最新版 | 欧美做受高潮电影o | 伊人宗合| 18久久久久久 | 久久久免费在线观看 | 中文字幕在线精品 | 国产精品99久久久久久人免费 | 青青河边草免费直播 | 特级西西www44高清大胆图片 | 久久久鲁| 欧洲性视频 | 日韩视频中文字幕在线观看 | 黄色官网在线观看 | 一级黄色av | 免费久久99精品国产 | 五月婷婷操 | 免费观看不卡av | 欧美激情视频免费看 | 国产亚洲高清视频 | 成人黄色在线播放 | 日本99热 | 日韩在线视频国产 | 亚洲免费视频在线观看 | 亚洲视频 中文字幕 | 日韩在线欧美在线 | 91欧美日韩国产 | 午夜久操 | 国产玖玖视频 | 亚洲午夜久久久久久久久久久 | 国产精品白丝jk白祙 | 亚洲成人家庭影院 | 亚洲成人精品影院 | 日本三级人妇 | 久久久久麻豆v国产 | 天天爱天天射天天干天天 | 久久久久久久久福利 | 久艹视频免费观看 | 黄色av成人在线观看 | 国产午夜精品一区二区三区在线观看 | 亚洲九九九在线观看 | 91麻豆精品国产91久久久无需广告 | 亚洲区另类春色综合小说校园片 | 天天操天天射天天 | 欧美午夜性生活 | 黄色片毛片 | 亚洲日本国产 | 日韩久久久 | 91精品视频免费在线观看 | 91香蕉视频好色先生 | 日日夜夜精品免费 | 欧美午夜久久 | 久久99欧美 | 国内毛片毛片 | 中文字幕在线观看第三页 | 在线观看蜜桃视频 | 免费一级片视频 | 欧美亚洲专区 | 亚洲日本一区二区在线 | 伊人导航 | 日韩理论片在线观看 | 丁香在线观看完整电影视频 | 国产福利不卡视频 | 国产二区视频在线 | 日韩伦理片hd | 亚洲精品福利在线 | 91精品国产三级a在线观看 | 国产人免费人成免费视频 | 欧美xxxxx在线视频 | 国产一区不卡在线 | 97色狠狠 | 久久久久国产精品视频 | 久久99操 | 伊人激情综合 | 亚洲人成免费网站 | 日韩视频一区二区三区在线播放免费观看 | 国产免费又爽又刺激在线观看 | 国产精品不卡在线播放 | 亚洲电影一区二区 | 人人干人人添 | 人人插人人玩 | 亚洲高清在线观看视频 | 91在线视频网址 | 激情五月婷婷综合网 | 在线观看一级片 | 成人久久久精品国产乱码一区二区 | 在线观看视频免费大全 | 国模视频一区二区 | 欧美日本在线视频 | 欧美午夜久久 | 亚洲作爱 | a久久久久久 | 久章草在线观看 | 久久免费黄色网址 | 天天操,夜夜操 | 91c网站色版视频 | 亚洲精品综合久久 | 免费观看久久久 | 一本大道久久精品懂色aⅴ 五月婷社区 | 国产精品一区二区果冻传媒 | 激情欧美日韩一区二区 | 久久97超碰 | 欧美午夜精品久久久久 | 日日夜夜天天久久 | 欧美精品xx | 天天伊人狠狠 | 中文字幕在线国产精品 | 久久久久在线观看 | 国产自在线观看 | 欧美a级成人淫片免费看 | 亚洲影视资源 | 久久99精品久久久久久 | 91视频国产高清 | av电影一区 | 久草视频在线免费 | 奇米777777 | 久草久热 | 国产又粗又猛又黄视频 | 亚洲视频 一区 | 国精产品满18岁在线 | 欧美色操| 国产偷v国产偷∨精品视频 在线草 | 五月天婷亚洲天综合网鲁鲁鲁 | 蜜臀久久99精品久久久久久网站 | 日日夜日日干 | 国产亚洲视频系列 | 国产黄免费 | 久久97久久97精品免视看 | 91三级在线观看 | 在线成人欧美 | 欧美少妇的秘密 | 美女视频久久久 | 深爱激情综合 | 黄色录像av | 日韩网站免费观看 | 欧洲精品码一区二区三区免费看 | 青春草国产视频 | 欧美成年人在线观看 | 亚洲欧洲中文日韩久久av乱码 | 黄色网免费 | 国产在线国偷精品产拍 | 天天操夜夜操国产精品 | 992tv又爽又黄的免费视频 | 久操操| 黄色三级免费 | 久久av电影| 岛国av在线 | 亚洲一区欧美精品 | 99综合久久| 国产精品不卡av | 久久婷婷色综合 | 久久99九九99精品 | 91免费看黄色 | 日韩大片在线播放 | 亚洲一本视频 | 日韩区在线观看 | 亚洲一区精品人人爽人人躁 | 久久免费视频2 | 久草在线在线视频 | 色一级片| 色综合久久久 | 亚洲精品视频在 | 五月天婷婷丁香花 | 中文字幕乱偷在线 | 视频在线观看一区 | 97免费公开视频 | 91麻豆精品91久久久久同性 | 天天躁天天躁天天躁婷 | www.狠狠色 | 精品 一区 在线 | 日本一区二区免费在线观看 | 日韩黄色一级电影 | 一区二区三区四区免费视频 | 日本女人在线观看 | 超碰日韩在线 | 国产无遮挡又黄又爽在线观看 | 天天天色综合 | 99久久婷婷国产综合亚洲 | 日韩一级黄色片 | 国产在线色视频 | 日本二区三区在线 | 亚洲精品视频免费观看 | 在线看国产精品 | 天天操综 | 免费在线激情视频 | 视频二区在线 | 91影视成人 | 91视频免费国产 | 国产精品不卡一区 | 亚洲精品中文字幕在线 | 美女网站在线观看 | 国产在线精品观看 | 欧美日韩亚洲在线观看 | 日韩美在线 | 国产精品2区 | 国产91影院| 婷婷成人综合 | 狠狠干激情 | 激情五月婷婷丁香 | 色婷婷六月天 | 午夜久久久久久久久久影院 | 免费av福利 | 国产精品成人一区二区 | av黄色在线 | 超碰人人干人人 | 成人一区在线观看 | 日韩亚洲在线视频 | 婷婷.com| 久久久久综合精品福利啪啪 | 成人免费在线视频 | 久久av电影 | 亚洲精品欧洲精品 | 蜜臀av一区二区 | 国产精品免费在线观看视频 | 开心激情五月婷婷 | 日韩精品视频免费在线观看 | 亚洲国产黄色片 | 不卡电影免费在线播放一区 | 婷婷激情欧美 | 特级免费毛片 | 天天想夜夜操 | av国产在线观看 | 精品视频久久久久久 | av在线播放国产 | 亚洲自拍av在线 | 国产高清视频在线播放 | 国产精品视频观看 | 91精品在线免费观看视频 | 在线99热| 综合激情伊人 | 久久久高清 | 99九九99九九九视频精品 | 亚洲精品国产日韩 | 日日爽夜夜操 | 九九色在线观看 | 欧美精品网站 | 欧美日韩中文字幕在线视频 | 色婷婷电影网 | 欧美午夜久久 | 日韩欧美视频免费在线观看 | 99久久国产免费看 | 高清免费av在线 | 99国内精品久久久久久久 | 激情视频亚洲 | 久久久精品网站 | 亚洲人天堂| 麻豆91在线 | 国产精品区一区 | a级国产乱理伦片在线播放 久久久久国产精品一区 | 欧美午夜精品久久久久久浪潮 | av免费在线看网站 | 99色精品视频 | 久久久999 | 日日躁你夜夜躁你av蜜 | 国产精品麻豆99久久久久久 | 欧美日韩中文另类 | 久久伊人操 | 免费高清在线观看成人 | 国产区第一页 | 久久av高清| 国产区精品 | 婷婷视频在线 | 国产精品字幕 | 在线电影中文字幕 | 97精品在线观看 | 久久高清精品 | 欧美日韩免费在线视频 | 欧美激情va永久在线播放 | 亚洲精品中文字幕在线 | 欧美性猛片 | 国产精品久久电影网 | 国产精品毛片一区视频播不卡 | 手机av资源 | 精品久久久久久久久久久久久久久久 | 亚洲天堂网站视频 | 日韩免费网址 | 国产精品初高中精品久久 | 国产免费国产 | 一区二区三区高清 | 99re视频在线观看 | av片子在线观看 | 夜色.com| 欧美乱码精品一区二区 | 91九色在线视频 | 亚洲免费av一区二区 | 午夜性福利| 黄色在线视频网址 | 亚洲精品在线观看视频 | 免费日韩一区二区三区 | 欧美日韩视频精品 | 欧美一区二区伦理片 | 久久av不卡 | 日韩av在线免费播放 | 国产欧美久久久精品影院 | 欧美午夜寂寞影院 | 国产精品一区二区三区99 | 色婷婷av一区 | 性色av一区二区 | 日韩欧美高清一区二区 | 91av视频免费在线观看 | 日韩欧美在线视频一区二区 | 成人av手机在线 | 久久成人在线视频 | 又黄又刺激视频 | 国产亚洲精品久久久久5区 成人h电影在线观看 | 91丨九色丨首页 | 亚洲经典视频在线观看 | 中文一区在线观看 | 久久久这里有精品 | 五月综合网站 | 国产又粗又猛又色又黄网站 | 欧美亚洲精品一区 | 久久男人中文字幕资源站 | 色婷在线 | 顶级欧美色妇4khd | 成人午夜电影久久影院 | 欧美日本国产在线观看 | 亚洲欧美视频 | 九九九电影免费看 | 96久久欧美麻豆网站 | 五月天六月婷 | 91高清完整版在线观看 | 国产精品99久久久久久有的能看 | 911国产在线观看 | 91香蕉视频色版 | 曰本免费av | 99视频久久 | 黄色片网站免费 | 色噜噜噜噜 | 麻豆91小视频 | 久久九九网站 | 亚洲热久久 | 国产拍揄自揄精品视频麻豆 | 91一区啪爱嗯打偷拍欧美 | 免费在线国产视频 | 亚洲精品动漫久久久久 | 国产 精品 资源 | 免费情趣视频 | 国内精品国产三级国产aⅴ久 | 狠狠操狠狠干2017 | 亚洲三级影院 | 在线草 | 蜜桃视频在线视频 | 国产精品久久久免费看 | 色婷婷综合成人av | 国产小视频免费在线网址 | 91爱爱网址 | 麻豆影视在线播放 | 91久久人澡人人添人人爽欧美 | 欧美色插 | 在线不卡视频 | 五月在线 | 久久视频在线视频 | 国产精品美女久久久久久久久 | 一区二区理论片 | 日韩欧美国产免费播放 | 国内视频在线 | 色五月情| 日本最大色倩网站www | 天天综合网久久综合网 | 在线中文字幕av观看 | 午夜精品一区二区三区在线视频 | 天天天操天天天干 | 免费观看性生交 | 久热免费在线观看 | 久久99精品久久久久婷婷 | 色综合天天综合 | 欧美成人亚洲成人 | 日韩影视在线 | 在线观看国产区 | 成人午夜电影免费在线观看 | 狠狠天天| 久久综合色婷婷 | av日韩在线网站 | 久久亚洲福利视频 | 黄色成人影视 | 亚洲乱码精品久久久久 | 国产美女精品在线 | 91久久国产综合精品女同国语 | 成年人在线免费看视频 | 亚洲精品中文字幕视频 | 国产一区播放 | 中文字幕亚洲精品在线观看 | 亚洲国产精品久久久久 | 国内精品免费久久影院 | 国产美女黄网站免费 | 亚洲一区二区视频在线播放 | 国产一级精品视频 | 黄色av电影在线 | 五月开心婷婷网 | 超碰在线日韩 | 狠狠色丁香婷婷综合久小说久 | 婷婷亚洲综合 | 久草精品网 | 91九色自拍 | 日韩中文字幕亚洲一区二区va在线 | 久久超| 久久8| 波多野结衣亚洲一区二区 | 国内精品久久久久久 | 激情综合网五月激情 | 伊人色综合久久天天 | 欧美一区二区在线免费观看 | 久久久久久久久久久久影院 | 在线免费91 | 99热精品在线 | 亚洲国产精品va在线 | 91精品在线视频 | 久久天天躁狠狠躁夜夜不卡公司 | 91麻豆精品国产91久久久更新时间 | 99精品国产在热久久下载 | 香蕉影院在线播放 | 日韩av看片| 久久精品99精品国产香蕉 | 亚洲黄色激情小说 | 中文字幕91 | 国产糖心vlog在线观看 | 天天色棕合合合合合合 | 国产精品美女久久久久久 | 精品国产一区二区三区在线 | 国内精品久久久久久久久久久 | 午夜在线观看一区 | 又黄又爽又刺激 | 天天爱天天射 | 亚洲成人动漫在线观看 | 精品1区二区 | 免费成人av | 少妇bbw搡bbbb搡bbb | 国产精品黑丝在线观看 | 婷婷综合| 久久伊人爱 | 中文字幕在线视频第一页 | 午夜在线观看影院 | 精品免费一区二区三区 | 国产精品自产拍在线观看中文 | 中文字幕有码在线 | 五月婷婷深开心 | 久久嗨| 精品视频999 | 国产很黄很色的视频 | 久久免视频| 日韩精品视频免费看 | 蜜臀久久99精品久久久无需会员 | 一区中文字幕电影 | 国产精品久久久久999 | 成人av av在线 | 欧美午夜久久 | 91精品国产自产在线观看永久 | 亚洲精品视频网址 | 99日韩精品 | 国产成人精品一区在线 | 久久国产视屏 | 免费看黄色大全 | 99热最新在线 | 亚洲一区黄色 | 又爽又黄又刺激的视频 | 久久永久免费 | 国产三级精品在线 | 激情中文在线 | 欧美精品一区在线发布 | 国产视频在线观看一区 | 99精品视频免费观看 | 欧美日韩国产综合一区二区 | 色狠狠干 | 久久综合久色欧美综合狠狠 | 黄色片视频免费 | 久久久久亚洲精品 | 久青草国产在线 | 亚洲成a人片在线观看中文 中文字幕在线视频第一页 狠狠色丁香婷婷综合 | 激情导航 | 狠狠躁夜夜躁人人爽超碰97香蕉 | 中文字幕高清视频 | 精品成人a区在线观看 | 777视频在线观看 | 日韩一区二区三区在线观看 | 高清精品在线 | 日本午夜免费福利视频 | 国产精品毛片一区视频播不卡 | 欧美另类老妇 | 国产美女在线精品免费观看 | 国产精品毛片完整版 | 国内精品久久天天躁人人爽 | 成人教育av | 99精彩视频在线观看免费 | 色干干| 91在线麻豆 | 亚洲男男gⅴgay双龙 | 日韩夜夜爽 | 国产一区二区三区免费视频 | 国产美女精彩久久 | 国产精品白丝jk白祙 | 久热精品国产 | 天天操天天干天天爽 | 欧美视频www| 国产一级黄色片免费看 | 精品自拍网 | 四虎国产 | 精品国产一区二区三区久久影院 | 国产精品丝袜久久久久久久不卡 | 国产日韩精品欧美 | 午夜视频在线观看一区 | 久 久久影院 | 国产美女视频网站 | 婷婷综合在线 | 亚洲在线激情 | 久草在线视频在线 | 精品美女视频 | 日本夜夜草视频网站 | 国产精品一区二区三区电影 | 久久九九免费视频 | 在线观看成人av | 国产91在线免费视频 | 国产免费又黄又爽 | 狠狠色香婷婷久久亚洲精品 | 日韩免费观看高清 | 爱爱av网站 | 天天操天天综合网 | 欧美日韩亚洲在线 | 天天操夜夜叫 | 欧美性生活免费看 | 2021国产视频 | 色综合天天视频在线观看 | 欧美另类性 | 五月情婷婷 | 欧美在线free | 久久国产精品免费一区 | 午夜电影 电影 | 日产中文字幕 | 久久av中文字幕片 | 成人影片在线免费观看 | 91精品91| 国产老太婆免费交性大片 | 午夜色影院 | 久久综合中文字幕 | 成人免费在线观看av | 91精品日韩 | 色橹橹欧美在线观看视频高清 | 五月丁色 | 一区二区三区视频在线 | 日韩中文字幕视频在线观看 | 亚洲va欧美va人人爽春色影视 | 99视频在线观看一区三区 | 久久91久久久久麻豆精品 | 中文字幕免费高清 | 成人av资源网 | 91成品人影院 | 久久天天躁夜夜躁狠狠躁2022 | 久久这里只有精品首页 | 中文字幕在线人 | 99免费精品| 欧美日韩有码 | 天天艹天天干天天 | 成人a视频在线观看 | 亚洲欧美日韩中文在线 | 国产精品二区在线观看 | 一区二区视频免费在线观看 | 在线看一区 | 久久99国产精品久久 | 一区二区欧美日韩 | 黄色一级大片在线免费看国产一 | 91一区一区三区 | 国产丝袜制服在线 | 亚州欧美精品 | 亚洲成免费 | 国产免费精彩视频 | 国产精品初高中精品久久 | 美女网站在线播放 | 中文字幕在线观看三区 | 婷婷射五月 | 亚洲一区视频在线播放 | 在线天堂视频 | 99免费在线观看视频 | 中文字幕中文字幕在线中文字幕三区 | 日韩欧美v| 夜色资源站国产www在线视频 | 久久黄色免费 | 日色在线视频 | 免费久久99精品国产婷婷六月 | 99精品欧美一区二区三区 | 在线高清av| 久久综合久久综合这里只有精品 | 日韩av专区 | 999久久a精品合区久久久 | 日韩欧美在线视频一区二区三区 | 色狠狠综合天天综合综合 | 亚洲黄色在线观看 | 99视频免费观看 | 免费大片av | 九九在线精品视频 | 韩国av免费在线观看 | 免费99精品国产自在在线 | 国产中文在线观看 | 国产黄色片免费在线观看 | aaa毛片视频 | 国产黄大片 | 国产午夜精品一区二区三区 | 日韩欧美国产视频 | 黄色一级在线视频 | 亚洲精品在线资源 | 欧美少妇的秘密 | 日韩在线视频网 | www.黄色网.com| 天天操天天操天天操天天操天天操 | 91人人澡人人爽人人精品 | 天天插综合网 | 国产色视频网站2 | 黄色性av| 亚洲高清在线 | 99久久精品电影 | 国产成人区 | 国产最新精品视频 | 免费69视频 | 蜜桃久久久 | 久草视频在线资源 | 韩国精品一区二区三区六区色诱 | 国产一区在线看 | 欧美成人基地 | 91黄色小视频| 日日操网站 | 国产免费观看高清完整版 | 最近免费中文字幕大全高清10 | 国产一区在线视频 | 最近免费中文视频 | 日韩区视频 | 国内一级片在线观看 | 色综合久久88 | 中文字幕av在线播放 | 五月天综合网 | 成人在线播放免费观看 | 中文字幕999 | 在线观看成人国产 | 日韩一区正在播放 | 亚州av成人 | 99视频久久| 日本mv大片欧洲mv大片 | 日韩精品一区二区三区丰满 | 蜜桃视频在线观看一区 | 中文字幕国内精品 | 日韩欧美高清在线 | 国产在线观看污片 | 99久久婷婷国产 | 国产va在线 | 免费麻豆 | 国产精品视频在线看 | 99久久久国产精品免费99 | 毛片精品免费在线观看 | 亚洲日日射 | 中文字幕精品视频 | 精品999在线观看 | 日韩午夜在线 | 国产大尺度视频 | 丝袜美腿在线 | 天天躁天天躁天天躁婷 | 久久五月激情 | 九九热只有这里有精品 | 久久综合婷婷国产二区高清 | 成人毛片一区二区三区 | 国产午夜精品免费一区二区三区视频 | 国产精品毛片 | 色婷婷激情四射 | 久久草网站 | 亚洲精品视频在线观看免费视频 | 久久久久久久福利 | 亚洲精品国产电影 | 激情网站五月天 | 中文字幕日本在线观看 | 国产在线黄色 | 国产高清不卡在线 | 婷婷播播网 | 久久99久久99精品 | 美女福利视频一区二区 | 天天综合入口 | 亚洲人成人99网站 | 色午夜影院 | 一区二区三区韩国免费中文网站 | 日本中文字幕观看 | 欧美日韩在线免费观看 | 91av视频免费在线观看 | 亚洲精品午夜久久久 | 日韩av影视在线观看 | 欧美精品久久久久久久亚洲调教 | 亚av在线 | 中文字幕免费看 | 日韩av片在线 | 国产激情免费 | 97超碰总站 | 亚洲精品99久久久久中文字幕 | 九色精品免费永久在线 | 97人人视频| 日本韩国精品一区二区在线观看 | 免费黄色av电影 | 国产在线欧美日韩 | 国产精品高清一区二区三区 | 在线免费高清视频 | 亚洲激情六月 | www黄免费 | 99精品国产99久久久久久97 | 久久久久99精品国产片 | 999成人网| 丁香花中文在线免费观看 | 人九九精品 | 精品免费视频 | www久久| 成人免费观看完整版电影 | 五月天堂网 | 国产中文字幕一区二区 | 91精品国产三级a在线观看 | 在线视频区 | a级片在线播放 | av最新资源 | 国产精品手机在线播放 | av官网 | 国产免费久久av | 国产精品99久久免费黑人 | 在线观看中文字幕一区 | 激情综合亚洲精品 | 成人观看视频 | 久久精品99久久 | 99热精品在线 | 日韩有码欧美 | 黄色在线免费观看网址 | 一区二区三区四区五区在线 | 视频一区二区三区视频 | 人人爽人人看 | 天天玩天天操天天射 | 国产精品久久久久一区二区国产 | 欧美a级在线 | 在线国产专区 | 91黄色在线看| 国产免费不卡av | 四虎在线视频免费观看 | 日日夜夜天天 | 亚洲免费高清视频 | 久久久久久片 | 国产在线日韩 | 精品一二三四视频 | 免费看成人片 | 日韩在线观看一区二区 | 在线观看视频一区二区 | 国产精品免费大片视频 | 国产99久久久国产精品成人免费 | 国产91精品一区二区 | 99久久精品久久久久久清纯 | 国产剧情一区二区在线观看 | 久草国产在线 | 开心激情综合网 | 日韩精品不卡在线观看 | 久久理论电影网 | 久久伊人八月婷婷综合激情 | 亚洲九九爱 | 国产精品久久久久久久午夜 | 又黄又刺激的视频 | 视频一区二区免费 | av免费在线观看1 | 久久日韩精品 | 伊人www22综合色 | 国产在线观看免费 | 丁香电影小说免费视频观看 | 欧美日韩不卡一区二区 | 亚洲伊人色 | 91私密视频 | 91久久精品一区二区三区 | 91视视频在线直接观看在线看网页在线看 | 99久久婷婷国产一区二区三区 | 中文视频一区二区 | 一区二区精品在线观看 | 国产在线a不卡 | 国产九九热 | 免费看黄色91| 黄色福利视频网站 | 精品主播网红福利资源观看 | 六月丁香伊人 | 久久久国产精品视频 | 久久久久久久久久久久久9999 | 三级av免费看 | 久久久精品 | 国产在线观看你懂得 | 色噜噜色噜噜 | 中文字幕在线播放一区二区 | 国产精品欧美一区二区 | 激情五月婷婷综合 | 99久久婷婷 | 黄色网免费 | 国产黄色片免费观看 | 亚洲va欧美va人人爽春色影视 | 亚洲国产成人精品在线观看 | 亚洲综合在线一区二区三区 | 97精品久久人人爽人人爽 | 中文字幕精品一区 | 综合婷婷 | 九九九热精品免费视频观看网站 | 一级黄色片在线免费看 | 久久综合日 | 99国产精品久久久久久久久久 | 日本性动态图 | 亚洲精品色视频 | 日韩精品极品视频 | 中文字幕第一页在线视频 | 日本高清xxxx | 婷婷中文字幕在线观看 | 国产理论免费 | 久久亚洲欧美日韩精品专区 | 色婷婷骚婷婷 | 亚洲乱码精品久久久 | 久久激情片 | 精品国产欧美一区二区三区不卡 | 狠狠的操你 | 亚洲精品美女久久 | 九九九九九国产 | 在线蜜桃视频 | av成人免费在线看 | 欧美日韩免费在线观看视频 | 在线观看亚洲精品 | 999久久久久久久久久久 | 久草在线免费资源 | 啪啪免费观看网站 | 欧美a级一区二区 | 日韩一级精品 | 三级毛片视频 | 国产成人精品福利 | 中文字幕 影院 | 天天插日日操 | 日日操操| 91看片淫黄大片一级在线观看 | 天堂在线视频免费观看 | 成人黄在线 | 国产99久久精品一区二区300 | 国产精品久久久久久久久久久久冷 | 精品国产伦一区二区三区观看说明 | 毛片1000部免费看 | 国产精品入口麻豆www | 99久久精品电影 | 五月天综合在线 | 欧美日韩高清一区二区 国产亚洲免费看 |