【Zookeeper】源码分析之持久化(三)之FileTxnSnapLog
一、前言
前面分析了FileSnap,接著繼續分析FileTxnSnapLog源碼,其封裝了TxnLog和SnapShot,其在持久化過程中是一個幫助類。
二、FileTxnSnapLog源碼分析
2.1 類的屬性
public class FileTxnSnapLog {//the direcotry containing the //the transaction logs// 日志文件目錄private final File dataDir;//the directory containing the//the snapshot directory// 快照文件目錄private final File snapDir;// 事務日志private TxnLog txnLog;// 快照private SnapShot snapLog;// 版本號public final static int VERSION = 2;// 版本public final static String version = "version-";// Loggerprivate static final Logger LOG = LoggerFactory.getLogger(FileTxnSnapLog.class); }說明:類的屬性中包含了TxnLog和SnapShot接口,即對FileTxnSnapLog的很多操作都會轉發給TxnLog和SnapLog進行操作,這是一種典型的組合方法。
2.2 內部類
FileTxnSnapLog包含了PlayBackListener內部類,用來接收事務應用過程中的回調,在Zookeeper數據恢復后期,會有事務修正過程,此過程會回調PlayBackListener來進行對應的數據修正。其源碼如下
public interface PlayBackListener {void onTxnLoaded(TxnHeader hdr, Record rec); }說明:在完成事務操作后,會調用到onTxnLoaded方法進行相應的處理。
2.3 構造函數
FileTxnSnapLog的構造函數如下
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);// 在datadir和snapdir下生成version-2目錄this.dataDir = new File(dataDir, version + VERSION);this.snapDir = new File(snapDir, version + VERSION);if (!this.dataDir.exists()) { // datadir存在但無法創建目錄,則拋出異常if (!this.dataDir.mkdirs()) {throw new IOException("Unable to create data directory "+ this.dataDir);}}if (!this.snapDir.exists()) { // snapdir存在但無法創建目錄,則拋出異常if (!this.snapDir.mkdirs()) {throw new IOException("Unable to create snap directory "+ this.snapDir);}}// 給屬性賦值txnLog = new FileTxnLog(this.dataDir);snapLog = new FileSnap(this.snapDir);}說明:對于構造函數而言,其會在傳入的datadir和snapdir目錄下新生成version-2的目錄,并且會判斷目錄是否創建成功,之后會創建txnLog和snapLog。
2.4 核心函數分析
1.?restore函數
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {// 根據snap文件反序列化dt和sessions snapLog.deserialize(dt, sessions);// FileTxnLog txnLog = new FileTxnLog(dataDir);// 獲取比最后處理的zxid+1大的log文件的迭代器TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);// 最大的zxidlong highestZxid = dt.lastProcessedZxid;TxnHeader hdr;try {while (true) {// iterator points to // the first valid txn when initialized// itr在read函數調用后就已經指向第一個合法的事務// 獲取事務頭hdr = itr.getHeader();if (hdr == null) { // 事務頭為空//empty logs // 表示日志文件為空return dt.lastProcessedZxid;}if (hdr.getZxid() < highestZxid && highestZxid != 0) { // 事務頭的zxid小于snapshot中的最大zxid并且其不為0,則會報錯LOG.error("{}(higestZxid) > {}(next log) for type {}",new Object[] { highestZxid, hdr.getZxid(),hdr.getType() });} else { // 重新賦值highestZxidhighestZxid = hdr.getZxid();}try {// 在datatree上處理事務 processTransaction(hdr,dt,sessions, itr.getTxn());} catch(KeeperException.NoNodeException e) {throw new IOException("Failed to process transaction type: " +hdr.getType() + " error: " + e.getMessage(), e);}// 每處理完一個事務都會進行回調 listener.onTxnLoaded(hdr, itr.getTxn());if (!itr.next()) // 已無事務,跳出循環break;}} finally {if (itr != null) { // 迭代器不為空,則關閉 itr.close();}}// 返回最高的zxidreturn highestZxid;}說明:restore用于恢復datatree和sessions,其步驟大致如下
① 根據snapshot文件反序列化datatree和sessions,進入②
② 獲取比snapshot文件中的zxid+1大的log文件的迭代器,以對log文件中的事務進行迭代,進入③
③ 迭代log文件的每個事務,并且將該事務應用在datatree中,同時會調用onTxnLoaded函數進行后續處理,進入④
④ 關閉迭代器,返回log文件中最后一個事務的zxid(作為最高的zxid)
其中會調用到FileTxnLog的read函數,read函數在FileTxnLog中已經進行過分析,會調用processTransaction函數,其源碼如下
public void processTransaction(TxnHeader hdr,DataTree dt,Map<Long, Integer> sessions, Record txn)throws KeeperException.NoNodeException {// 事務處理結果 ProcessTxnResult rc;switch (hdr.getType()) { // 確定事務類型case OpCode.createSession: // 創建會話// 添加進會話 sessions.put(hdr.getClientId(),((CreateSessionTxn) txn).getTimeOut());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- create session in log: 0x"+ Long.toHexString(hdr.getClientId())+ " with timeout: "+ ((CreateSessionTxn) txn).getTimeOut());}// give dataTree a chance to sync its lastProcessedZxid// 處理事務rc = dt.processTxn(hdr, txn);break;case OpCode.closeSession: // 關閉會話// 會話中移除 sessions.remove(hdr.getClientId());if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"playLog --- close session in log: 0x"+ Long.toHexString(hdr.getClientId()));}// 處理事務rc = dt.processTxn(hdr, txn);break;default:// 處理事務rc = dt.processTxn(hdr, txn);}/*** Snapshots are lazily created. So when a snapshot is in progress,* there is a chance for later transactions to make into the* snapshot. Then when the snapshot is restored, NONODE/NODEEXISTS* errors could occur. It should be safe to ignore these.*/if (rc.err != Code.OK.intValue()) { // 忽略處理結果中可能出現的錯誤LOG.debug("Ignoring processTxn failure hdr:" + hdr.getType()+ ", error: " + rc.err + ", path: " + rc.path);}}說明:processTransaction會根據事務頭中記錄的事務類型(createSession、closeSession、其他類型)來進行相應的操作,對于createSession類型而言,其會將會話和超時時間添加至會話map中,對于closeSession而言,會話map會根據客戶端的id號刪除其會話,同時,所有的操作都會調用到dt.processTxn函數,其源碼如下
public ProcessTxnResult processTxn(TxnHeader header, Record txn){// 事務處理結果ProcessTxnResult rc = new ProcessTxnResult();try {// 從事務頭中解析出相應屬性并保存至rc中rc.clientId = header.getClientId();rc.cxid = header.getCxid();rc.zxid = header.getZxid();rc.type = header.getType();rc.err = 0;rc.multiResult = null;switch (header.getType()) { // 確定事務類型case OpCode.create: // 創建結點// 顯示轉化CreateTxn createTxn = (CreateTxn) txn;// 獲取創建結點路徑rc.path = createTxn.getPath();// 創建結點 createNode(createTxn.getPath(),createTxn.getData(),createTxn.getAcl(),createTxn.getEphemeral() ? header.getClientId() : 0,createTxn.getParentCVersion(),header.getZxid(), header.getTime());break;case OpCode.delete: // 刪除結點// 顯示轉化DeleteTxn deleteTxn = (DeleteTxn) txn;// 獲取刪除結點路徑rc.path = deleteTxn.getPath();// 刪除結點 deleteNode(deleteTxn.getPath(), header.getZxid());break;case OpCode.setData: // 寫入數據// 顯示轉化SetDataTxn setDataTxn = (SetDataTxn) txn;// 獲取寫入數據結點路徑rc.path = setDataTxn.getPath();// 寫入數據rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(), header.getZxid(), header.getTime());break;case OpCode.setACL: // 設置ACL// 顯示轉化SetACLTxn setACLTxn = (SetACLTxn) txn;// 獲取路徑rc.path = setACLTxn.getPath();// 設置ACLrc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),setACLTxn.getVersion());break;case OpCode.closeSession: // 關閉會話// 關閉會話 killSession(header.getClientId(), header.getZxid());break;case OpCode.error: // 錯誤// 顯示轉化ErrorTxn errTxn = (ErrorTxn) txn;// 記錄錯誤rc.err = errTxn.getErr();break;case OpCode.check: // 檢查// 顯示轉化CheckVersionTxn checkTxn = (CheckVersionTxn) txn;// 獲取路徑rc.path = checkTxn.getPath();break;case OpCode.multi: // 多個事務// 顯示轉化MultiTxn multiTxn = (MultiTxn) txn ;// 獲取事務列表List<Txn> txns = multiTxn.getTxns();rc.multiResult = new ArrayList<ProcessTxnResult>();boolean failed = false;for (Txn subtxn : txns) { // 遍歷事務列表if (subtxn.getType() == OpCode.error) {failed = true;break;}}boolean post_failed = false;for (Txn subtxn : txns) { // 遍歷事務列表,確定每個事務類型并進行相應操作// 處理事務的數據ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());Record record = null;switch (subtxn.getType()) {case OpCode.create:record = new CreateTxn();break;case OpCode.delete:record = new DeleteTxn();break;case OpCode.setData:record = new SetDataTxn();break;case OpCode.error:record = new ErrorTxn();post_failed = true;break;case OpCode.check:record = new CheckVersionTxn();break;default:throw new IOException("Invalid type of op: " + subtxn.getType());}assert(record != null);// 將bytebuffer轉化為record(初始化record的相關屬性) ByteBufferInputStream.byteBuffer2Record(bb, record);if (failed && subtxn.getType() != OpCode.error){ // 失敗并且不為error類型int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();subtxn.setType(OpCode.error);record = new ErrorTxn(ec);}if (failed) { // 失敗assert(subtxn.getType() == OpCode.error) ;}// 生成事務頭TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),header.getZxid(), header.getTime(), subtxn.getType());// 遞歸調用處理事務ProcessTxnResult subRc = processTxn(subHdr, record);// 保存處理結果 rc.multiResult.add(subRc);if (subRc.err != 0 && rc.err == 0) {rc.err = subRc.err ;}}break;}} catch (KeeperException e) {if (LOG.isDebugEnabled()) {LOG.debug("Failed: " + header + ":" + txn, e);}rc.err = e.code().intValue();} catch (IOException e) {if (LOG.isDebugEnabled()) {LOG.debug("Failed: " + header + ":" + txn, e);}}/** A snapshot might be in progress while we are modifying the data* tree. If we set lastProcessedZxid prior to making corresponding* change to the tree, then the zxid associated with the snapshot* file will be ahead of its contents. Thus, while restoring from* the snapshot, the restore method will not apply the transaction* for zxid associated with the snapshot file, since the restore* method assumes that transaction to be present in the snapshot.** To avoid this, we first apply the transaction and then modify* lastProcessedZxid. During restore, we correctly handle the* case where the snapshot contains data ahead of the zxid associated* with the file.*/// 事務處理結果中保存的zxid大于已經被處理的最大的zxid,則重新賦值if (rc.zxid > lastProcessedZxid) {lastProcessedZxid = rc.zxid;}/** Snapshots are taken lazily. It can happen that the child* znodes of a parent are created after the parent* is serialized. Therefore, while replaying logs during restore, a* create might fail because the node was already* created.** After seeing this failure, we should increment* the cversion of the parent znode since the parent was serialized* before its children.** Note, such failures on DT should be seen only during* restore.*/if (header.getType() == OpCode.create &&rc.err == Code.NODEEXISTS.intValue()) { // 處理在恢復數據過程中的結點創建操作LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +" path:" + rc.path + " err: " + rc.err);int lastSlash = rc.path.lastIndexOf('/');String parentName = rc.path.substring(0, lastSlash);CreateTxn cTxn = (CreateTxn)txn;try {setCversionPzxid(parentName, cTxn.getParentCVersion(),header.getZxid());} catch (KeeperException.NoNodeException e) {LOG.error("Failed to set parent cversion for: " +parentName, e);rc.err = e.code().intValue();}} else if (rc.err != Code.OK.intValue()) {LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +" : error: " + rc.err);}return rc;} View Code說明:processTxn用于處理事務,即將事務操作應用到DataTree內存數據庫中,以恢復成最新的數據。
2.?save函數
public void save(DataTree dataTree,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)throws IOException {// 獲取最后處理的zxidlong lastZxid = dataTree.lastProcessedZxid;// 生成snapshot文件File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),snapshotFile);// 序列化datatree、sessionsWithTimeouts至snapshot文件 snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);}說明:save函數用于將sessions和datatree保存至snapshot文件中,其大致步驟如下
① 獲取內存數據庫中已經處理的最新的zxid,進入②
② 根據zxid和快照目錄生成snapshot文件,進入③
③ 將datatree(內存數據庫)、sessionsWithTimeouts序列化至快照文件。
其他的函數或多或少都是調用TxnLog和SnapLog中的相應函數,之前已經進行過分析,這里不再累贅。
三、總結
本篇博文分析了FileTxnSnapLog的源碼,其主要封裝了TxnLog和SnapLog來進行相應的處理,其提供了從snapshot文件和log文件中恢復內存數據庫的接口,源碼相對而言較為簡單,也謝謝各位園友的觀看~
轉載于:https://www.cnblogs.com/leesf456/p/6285703.html
總結
以上是生活随笔為你收集整理的【Zookeeper】源码分析之持久化(三)之FileTxnSnapLog的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BZOJ 4066: 简单题
- 下一篇: PAT1021