Lucene学习总结之四:Lucene索引过程分析
對(duì)于Lucene的索引過程,除了將詞(Term)寫入倒排表并最終寫入Lucene的索引文件外,還包括分詞(Analyzer)和合并段(merge segments)的過程,本次不包括這兩部分,將在以后的文章中進(jìn)行分析。
Lucene的索引過程,很多的博客,文章都有介紹,推薦大家上網(wǎng)搜一篇文章:《Annotated Lucene》,好像中文名稱叫《Lucene源碼剖析》是很不錯(cuò)的。
想要真正了解Lucene索引文件過程,最好的辦法是跟進(jìn)代碼調(diào)試,對(duì)著文章看代碼,這樣不但能夠最詳細(xì)準(zhǔn)確的掌握索引過程(描述都是有偏差的,而代碼是不會(huì)騙你的),而且還能夠?qū)W習(xí)Lucene的一些優(yōu)秀的實(shí)現(xiàn),能夠在以后的工作中為我所用,畢竟Lucene是比較優(yōu)秀的開源項(xiàng)目之一。
由于Lucene已經(jīng)升級(jí)到3.0.0了,本索引過程為Lucene 3.0.0的索引過程。
一、索引過程體系結(jié)構(gòu)
Lucene 3.0的搜索要經(jīng)歷一個(gè)十分復(fù)雜的過程,各種信息分散在不同的對(duì)象中分析,處理,寫入,為了支持多線程,每個(gè)線程都創(chuàng)建了一系列類似結(jié)構(gòu)的對(duì)象集,為了提高效率,要復(fù)用一些對(duì)象集,這使得索引過程更加復(fù)雜。
其實(shí)索引過程,就是經(jīng)歷下圖中所示的索引鏈的過程,索引鏈中的每個(gè)節(jié)點(diǎn),負(fù)責(zé)索引文檔的不同部分的信息 ,當(dāng)經(jīng)歷完所有的索引鏈的時(shí)候,文檔就處理完畢了。最初的索引鏈,我們稱之基本索引鏈?。
為了支持多線程,使得多個(gè)線程能夠并發(fā)處理文檔,因而每個(gè)線程都要建立自己的索引鏈體系,使得每個(gè)線程能夠獨(dú)立工作,在基本索引鏈基礎(chǔ)上建立起來的每個(gè)線程獨(dú)立的索引鏈體系,我們稱之線程索引鏈?。線程索引鏈的每個(gè)節(jié)點(diǎn)是由基本索引鏈中的相應(yīng)的節(jié)點(diǎn)調(diào)用函數(shù)addThreads創(chuàng)建的。
為了提高效率,考慮到對(duì)相同域的處理有相似的過程,應(yīng)用的緩存也大致相當(dāng),因而不必每個(gè)線程在處理每一篇文檔的時(shí)候都重新創(chuàng)建一系列對(duì)象,而是復(fù)用這些對(duì)象。所以對(duì)每個(gè)域也建立了自己的索引鏈體系,我們稱之域索引鏈?。域索引鏈的每個(gè)節(jié)點(diǎn)是由線程索引鏈中的相應(yīng)的節(jié)點(diǎn)調(diào)用addFields創(chuàng)建的。
當(dāng)完成對(duì)文檔的處理后,各部分信息都要寫到索引文件中,寫入索引文件的過程是同步的,不是多線程的,也是沿著基本索引鏈將各部分信息依次寫入索引文件的。
下面詳細(xì)分析這一過程。
?
二、詳細(xì)索引過程
1、創(chuàng)建IndexWriter對(duì)象
代碼:
| IndexWriter writer = new IndexWriter(FSDirectory.open(INDEX_DIR), new StandardAnalyzer(Version.LUCENE_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED); |
IndexWriter對(duì)象主要包含以下幾方面的信息:
- 用于索引文檔
- Directory directory;? 指向索引文件夾
- Analyzer analyzer;??? 分詞器
- Similarity similarity = Similarity.getDefault(); 影響打分的標(biāo)準(zhǔn)化因子(normalization factor)部分,對(duì)文檔的打分分兩個(gè)部分,一部分是索引階段計(jì)算的,與查詢語句無關(guān),一部分是搜索階段計(jì)算的,與查詢語句相關(guān)。
- SegmentInfos segmentInfos = new SegmentInfos(); 保存段信息,大家會(huì)發(fā)現(xiàn),和segments_N中的信息幾乎一一對(duì)應(yīng)。
- IndexFileDeleter deleter; 此對(duì)象不是用來刪除文檔的,而是用來管理索引文件的。
- Lock writeLock; 每一個(gè)索引文件夾只能打開一個(gè)IndexWriter,所以需要鎖。
- Set segmentsToOptimize = new HashSet(); 保存正在最優(yōu)化(optimize)的段信息。當(dāng)調(diào)用optimize的時(shí)候,當(dāng)前所有的段信息加入此Set,此后新生成的段并不參與此次最優(yōu)化。
- 用于合并段,在合并段的文章中將詳細(xì)描述
- SegmentInfos localRollbackSegmentInfos;
- HashSet mergingSegments = new HashSet();
- MergePolicy mergePolicy = new LogByteSizeMergePolicy(this);
- MergeScheduler mergeScheduler = new ConcurrentMergeScheduler();
- LinkedList pendingMerges = new LinkedList();
- Set runningMerges = new HashSet();
- List mergeExceptions = new ArrayList();
- long mergeGen;
- 為保持索引完整性,一致性和事務(wù)性
- SegmentInfos rollbackSegmentInfos; 當(dāng)IndexWriter對(duì)索引進(jìn)行了添加,刪除文檔操作后,可以調(diào)用commit將修改提交到文件中去,也可以調(diào)用rollback取消從上次commit到此時(shí)的修改。
- SegmentInfos localRollbackSegmentInfos; 此段信息主要用于將其他的索引文件夾合并到此索引文件夾的時(shí)候,為防止合并到一半出錯(cuò)可回滾所保存的原來的段信息。?
- 一些配置
- long writeLockTimeout; 獲得鎖的時(shí)間超時(shí)。當(dāng)超時(shí)的時(shí)候,說明此索引文件夾已經(jīng)被另一個(gè)IndexWriter打開了。
- int termIndexInterval; 同tii和tis文件中的indexInterval。
?
有關(guān)SegmentInfos對(duì)象所保存的信息:
- 當(dāng)索引文件夾如下的時(shí)候,SegmentInfos對(duì)象如下表
| segmentInfos??? SegmentInfos? (id=37)???? |
有關(guān)IndexFileDeleter:
- 其不是用來刪除文檔的,而是用來管理索引文件的。
- 在對(duì)文檔的添加,刪除,對(duì)段的合并的處理過程中,會(huì)生成很多新的文件,并需要?jiǎng)h除老的文件,因而需要管理。
- 然而要被刪除的文件又可能在被用,因而要保存一個(gè)引用計(jì)數(shù),僅僅當(dāng)引用計(jì)數(shù)為零的時(shí)候,才執(zhí)行刪除。
- 下面這個(gè)例子能很好的說明IndexFileDeleter如何對(duì)文件引用計(jì)數(shù)并進(jìn)行添加和刪除的。
| (1) 創(chuàng)建IndexWriter時(shí)???? IndexWriter writer = new IndexWriter(FSDirectory.open(indexDir), new StandardAnalyzer(Version.LUCENE_CURRENT), true, IndexWriter.MaxFieldLength.LIMITED);? 索引文件夾如下:
引用計(jì)數(shù)如下: refCounts??? HashMap? (id=101)????? (2) 添加第一個(gè)段時(shí) indexDocs(writer, docDir);? 首先生成的不是compound文件
因而引用計(jì)數(shù)如下: refCounts??? HashMap? (id=101)????? 然后會(huì)合并成compound文件,并加入引用計(jì)數(shù)
refCounts??? HashMap? (id=101)????? 然后會(huì)用IndexFileDeleter.decRef()來刪除[_0.nrm, _0.tis, _0.fnm, _0.tii, _0.frq, _0.fdx, _0.prx, _0.fdt]文件
refCounts??? HashMap? (id=101)????? 然后為建立新的segments_2 ? ? refCounts??? HashMap? (id=77)????? 然后IndexFileDeleter.decRef() 刪除segments_1文件 ? refCounts??? HashMap? (id=77)????? (3) 添加第二個(gè)段 indexDocs(writer, docDir);? ?? (4) 添加第三個(gè)段,由于MergeFactor為3,則會(huì)進(jìn)行一次段合并。 indexDocs(writer, docDir);? 首先和其他的段一樣,生成_2.cfs以及segments_4 ? 同時(shí)創(chuàng)建了一個(gè)線程來進(jìn)行背后進(jìn)行段合并(ConcurrentMergeScheduler$MergeThread.run()) ? 這時(shí)候的引用計(jì)數(shù)如下 refCounts??? HashMap? (id=84)????? (5) 關(guān)閉writer writer.close(); 通過IndexFileDeleter.decRef()刪除被合并的段 ? |
有關(guān)SimpleFSLock進(jìn)行JVM之間的同步:
- 有時(shí)候,我們寫java程序的時(shí)候,也需要不同的JVM之間進(jìn)行同步,來保護(hù)一個(gè)整個(gè)系統(tǒng)中唯一的資源。
- 如果唯一的資源僅僅在一個(gè)進(jìn)程中,則可以使用線程同步的機(jī)制
- 然而如果唯一的資源要被多個(gè)進(jìn)程進(jìn)行訪問,則需要進(jìn)程間同步的機(jī)制,無論是Windows和Linux在操作系統(tǒng)層面都有很多的進(jìn)程間同步的機(jī)制。
- 但進(jìn)程間的同步卻不是Java的特長,Lucene的SimpleFSLock給我們提供了一種方式。
| Lock的抽象類? public abstract class Lock { ? public static long LOCK_POLL_INTERVAL = 1000; ? public static final long LOCK_OBTAIN_WAIT_FOREVER = -1; ? public abstract boolean obtain() throws IOException; ? public boolean obtain(long lockWaitTimeout) throws LockObtainFailedException, IOException { ??? boolean locked = obtain(); ??? if (lockWaitTimeout < 0 && lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER)? ??? long maxSleepCount = lockWaitTimeout / LOCK_POLL_INTERVAL; ??? long sleepCount = 0; ??? while (!locked) { ????? if (lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER && sleepCount++ >= maxSleepCount) {? ? public abstract void release() throws IOException; ? public abstract boolean isLocked() throws IOException; } LockFactory的抽象類 public abstract class LockFactory { ? public abstract Lock makeLock(String lockName); ? abstract public void clearLock(String lockName) throws IOException;? SimpleFSLock的實(shí)現(xiàn)類 class SimpleFSLock extends Lock { ? File lockFile;? ? public SimpleFSLock(File lockDir, String lockFileName) {? ? @Override? ??? if (!lockDir.exists()) { ????? if (!lockDir.mkdirs())? ??? } else if (!lockDir.isDirectory()) { ????? throw new IOException("Found regular file where directory expected: " + lockDir.getAbsolutePath());? ??? return lockFile.createNewFile(); ? } ? @Override? ??? if (lockFile.exists() && !lockFile.delete())? ? } ? @Override? ??? return lockFile.exists(); ? } } SimpleFSLockFactory的實(shí)現(xiàn)類 public class SimpleFSLockFactory extends FSLockFactory { ? public SimpleFSLockFactory(String lockDirName) throws IOException { ??? setLockDir(new File(lockDirName)); ? } ? @Override? ??? if (lockPrefix != null) { ????? lockName = lockPrefix + "-" + lockName; ??? } ??? return new SimpleFSLock(lockDir, lockName); ? } ? @Override? ??? if (lockDir.exists()) { ????? if (lockPrefix != null) { ??????? lockName = lockPrefix + "-" + lockName; ????? } ????? File lockFile = new File(lockDir, lockName); ????? if (lockFile.exists() && !lockFile.delete()) { ??????? throw new IOException("Cannot delete " + lockFile); ????? } ??? } ? } }; |
?
2、創(chuàng)建文檔Document對(duì)象,并加入域(Field)
代碼:
| Document doc = new Document(); doc.add(new Field("path", f.getPath(), Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("modified",DateTools.timeToString(f.lastModified(), DateTools.Resolution.MINUTE), Field.Store.YES, Field.Index.NOT_ANALYZED)); doc.add(new Field("contents", new FileReader(f))); |
Document對(duì)象主要包括以下部分:
- 此文檔的boost,默認(rèn)為1,大于一說明比一般的文檔更加重要,小于一說明更不重要。
- 一個(gè)ArrayList保存此文檔所有的域
- 每一個(gè)域包括域名,域值,和一些標(biāo)志位,和fnm,fdx,fdt中的描述相對(duì)應(yīng)。
| doc??? Document? (id=42)???? |
3、將文檔加入IndexWriter
代碼:
| writer.addDocument(doc);? -->IndexWriter.addDocument(Document doc, Analyzer analyzer)? ???? -->doFlush = docWriter.addDocument(doc, analyzer);? ????????? --> DocumentsWriter.updateDocument(Document, Analyzer, Term)? 注:--> 代表一級(jí)函數(shù)調(diào)用 |
IndexWriter繼而調(diào)用DocumentsWriter.addDocument,其又調(diào)用DocumentsWriter.updateDocument。
4、將文檔加入DocumentsWriter
代碼:
| DocumentsWriter.updateDocument(Document doc, Analyzer analyzer, Term delTerm)? -->(1) DocumentsWriterThreadState state = getThreadState(doc, delTerm);? -->(2) DocWriter perDoc = state.consumer.processDocument();? -->(3) finishDocument(state, perDoc); |
DocumentsWriter對(duì)象主要包含以下幾部分:
- 用于寫索引文件
- IndexWriter writer;
- Directory directory;
- Similarity similarity:分詞器
- String segment:當(dāng)前的段名,每當(dāng)flush的時(shí)候,將索引寫入以此為名稱的段。
| IndexWriter.doFlushInternal()? --> String segment = docWriter.getSegment();//return segment? --> newSegment = new SegmentInfo(segment,……);? --> docWriter.createCompoundFile(segment);//根據(jù)segment創(chuàng)建cfs文件。 |
- ?
- String docStoreSegment:存儲(chǔ)域所要寫入的目標(biāo)段。(在索引文件格式一文中已經(jīng)詳細(xì)描述)
- int docStoreOffset:存儲(chǔ)域在目標(biāo)段中的偏移量。
- int nextDocID:下一篇添加到此索引的文檔ID號(hào),對(duì)于同一個(gè)索引文件夾,此變量唯一,且同步訪問。
- DocConsumer consumer; 這是整個(gè)索引過程的核心,是IndexChain整個(gè)索引鏈的源頭。
| 基本索引鏈: 對(duì)于一篇文檔的索引過程,不是由一個(gè)對(duì)象來完成的,而是用對(duì)象組合的方式形成的一個(gè)處理鏈,鏈上的每個(gè)對(duì)象僅僅處理索引過程的一部分,稱為索引鏈,由于后面還有其他的索引鏈,所以此處的索引鏈我稱為基本索引鏈。 DocConsumer consumer 類型為DocFieldProcessor,是整個(gè)索引鏈的源頭,包含如下部分:
|
- 刪除文檔
- BufferedDeletes deletesInRAM = new BufferedDeletes();
- BufferedDeletes deletesFlushed = new BufferedDeletes();
| 類BufferedDeletes包含了一下的成員變量:
由此可見,文檔的刪除主要有三種方式:
刪除文檔既可以用reader進(jìn)行刪除,也可以用writer進(jìn)行刪除,不同的是,reader進(jìn)行刪除后,此reader馬上能夠生效,而用writer刪除后,會(huì)被緩存在deletesInRAM及deletesFlushed中,只有寫入到索引文件中,當(dāng)reader再次打開的時(shí)候,才能夠看到。 那deletesInRAM和deletesFlushed各有什么用處呢? 此版本的Lucene對(duì)文檔的刪除是支持多線程的,當(dāng)用IndexWriter刪除文檔的時(shí)候,都是緩存在deletesInRAM中的,直到flush,才將刪除的文檔寫入到索引文件中去,我們知道flush是需要一段時(shí)間的,那么在flush的過程中,另一個(gè)線程又有文檔刪除怎么辦呢? 一般過程是這個(gè)樣子的,當(dāng)flush的時(shí)候,首先在同步(synchornized)的方法pushDeletes中,將deletesInRAM全部加到deletesFlushed中,然后將deletesInRAM清空,退出同步方法,于是flush的線程程就向索引文件寫deletesFlushed中的刪除文檔的過程,而與此同時(shí)其他線程新刪除的文檔則添加到新的deletesInRAM中去,直到下次flush才寫入索引文件。 |
- 緩存管理
- 為了提高索引的速度,Lucene對(duì)很多的數(shù)據(jù)進(jìn)行了緩存,使一起寫入磁盤,然而緩存需要進(jìn)行管理,何時(shí)分配,何時(shí)回收,何時(shí)寫入磁盤都需要考慮。
- ArrayList freeCharBlocks = new ArrayList();將用于緩存詞(Term)信息的空閑塊
- ArrayList freeByteBlocks = new ArrayList();將用于緩存文檔號(hào)(doc id)及詞頻(freq),位置(prox)信息的空閑塊。
- ArrayList freeIntBlocks = new ArrayList();將存儲(chǔ)某詞的詞頻(freq)和位置(prox)分別在byteBlocks中的偏移量
- boolean bufferIsFull;用來判斷緩存是否滿了,如果滿了,則應(yīng)該寫入磁盤
- long numBytesAlloc;分配的內(nèi)存數(shù)量
- long numBytesUsed;使用的內(nèi)存數(shù)量
- long freeTrigger;應(yīng)該開始回收內(nèi)存時(shí)的內(nèi)存用量。
- long freeLevel;回收內(nèi)存應(yīng)該回收到的內(nèi)存用量。
- long ramBufferSize;用戶設(shè)定的內(nèi)存用量。
| 緩存用量之間的關(guān)系如下:? DocumentsWriter.setRAMBufferSizeMB(double mb){? ??? ramBufferSize = (long) (mb*1024*1024);//用戶設(shè)定的內(nèi)存用量,當(dāng)使用內(nèi)存大于此時(shí),開始寫入磁盤? DocumentsWriter.balanceRAM(){? ??? if (numBytesAlloc+deletesRAMUsed > freeTrigger) {? ??? //當(dāng)分配的內(nèi)存加刪除文檔所占用的內(nèi)存大于105%的時(shí)候,開始釋放內(nèi)存? ??????? while(numBytesAlloc+deletesRAMUsed > freeLevel) {? ??????? //一直進(jìn)行釋放,直到95%? ??????????? //釋放free blocks ??????????? byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);? ??????????? freeCharBlocks.remove(freeCharBlocks.size()-1);? ??????????? freeIntBlocks.remove(freeIntBlocks.size()-1);? ??????? if (numBytesUsed+deletesRAMUsed > ramBufferSize){ ??????? //當(dāng)使用的內(nèi)存加刪除文檔占有的內(nèi)存大于用戶指定的內(nèi)存時(shí),可以寫入磁盤 ????????????? bufferIsFull = true; ??????? } ??? }? 當(dāng)判斷是否應(yīng)該寫入磁盤時(shí):
DocumentsWriter.timeToFlushDeletes(){ ??? return (bufferIsFull || deletesFull()) && setFlushPending(); } DocumentsWriter.deletesFull(){ ??? return (ramBufferSize != IndexWriter.DISABLE_AUTO_FLUSH &&? } |
- 多線程并發(fā)索引
- 為了支持多線程并發(fā)索引,對(duì)每一個(gè)線程都有一個(gè)DocumentsWriterThreadState,其為每一個(gè)線程根據(jù)DocConsumer consumer的索引鏈來創(chuàng)建每個(gè)線程的索引鏈(XXXPerThread),來進(jìn)行對(duì)文檔的并發(fā)處理。
- DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
- HashMap threadBindings = new HashMap();
- 雖然對(duì)文檔的處理過程可以并行,但是將文檔寫入索引文件卻必須串行進(jìn)行,串行寫入的代碼在DocumentsWriter.finishDocument中
- WaitQueue waitQueue = new WaitQueue()
- long waitQueuePauseBytes
- long waitQueueResumeBytes
| 在Lucene中,文檔是按添加的順序編號(hào)的,DocumentsWriter中的nextDocID就是記錄下一個(gè)添加的文檔id。 當(dāng)Lucene支持多線程的時(shí)候,就必須要有一個(gè)synchornized方法來付給文檔id并且將nextDocID加一,這些是在DocumentsWriter.getThreadState這個(gè)函數(shù)里面做的。 雖然給文檔付ID沒有問題了。但是由Lucene索引文件格式我們知道,文檔是要按照ID的順序從小到大寫到索引文件中去的,然而不同的文檔處理速度不同,當(dāng)一個(gè)先來的線程一處理一篇需要很長時(shí)間的大文檔時(shí),另一個(gè)后來的線程二可能已經(jīng)處理了很多小的文檔了,但是這些后來小文檔的ID號(hào)都大于第一個(gè)線程所處理的大文檔,因而不能馬上寫到索引文件中去,而是放到waitQueue中,僅僅當(dāng)大文檔處理完了之后才寫入索引文件。 waitQueue中有一個(gè)變量nextWriteDocID表示下一個(gè)可以寫入文件的ID,當(dāng)付給大文檔ID=4時(shí),則nextWriteDocID也設(shè)為4,雖然后來的小文檔5,6,7,8等都已處理結(jié)束,但是如下代碼, WaitQueue.add(){ ??? if (doc.docID == nextWriteDocID){? ?? doPause() } 則把5, 6, 7, 8放入waiting隊(duì)列,并且記錄當(dāng)前等待的文檔所占用的內(nèi)存大小waitingBytes。 當(dāng)大文檔4處理完畢后,不但寫入文檔4,把原來等待的文檔5, 6, 7, 8也一起寫入。 WaitQueue.add(){ ??? if (doc.docID == nextWriteDocID) { ?????? writeDocument(doc); ?????? while(true) { ?????????? doc = waiting[nextWriteLoc]; ?????????? writeDocument(doc); ?????? } ?? } else { ????? ………… ?? } ?? doPause() } 但是這存在一個(gè)問題:當(dāng)大文檔很大很大,處理的很慢很慢的時(shí)候,后來的線程二可能已經(jīng)處理了很多的小文檔了,這些文檔都是在waitQueue中,則占有了越來越多的內(nèi)存,長此以往,有內(nèi)存不夠的危險(xiǎn)。 因而在finishDocuments里面,在WaitQueue.add最后調(diào)用了doPause()函數(shù) DocumentsWriter.finishDocument(){ ??? doPause = waitQueue.add(docWriter); ??? if (doPause)? ??? notifyAll(); } WaitQueue.doPause() {? 當(dāng)waitingBytes足夠大的時(shí)候(為用戶指定的內(nèi)存使用量的10%),doPause返回true,于是后來的線程二會(huì)進(jìn)入wait狀態(tài),不再處理另外的文檔,而是等待線程一處理大文檔結(jié)束。 當(dāng)線程一處理大文檔結(jié)束的時(shí)候,調(diào)用notifyAll喚醒等待他的線程。 DocumentsWriter.waitForWaitQueue() {? WaitQueue.doResume() {? 當(dāng)waitingBytes足夠小的時(shí)候,doResume返回true, 則線程二不用再wait了,可以繼續(xù)處理另外的文檔。 |
- 一些標(biāo)志位
- int maxFieldLength:一篇文檔中,一個(gè)域內(nèi)可索引的最大的詞(Term)數(shù)。
- int maxBufferedDeleteTerms:可緩存的最大的刪除詞(Term)數(shù)。當(dāng)大于這個(gè)數(shù)的時(shí)候,就要寫到文件中了。
此過程又包含如下三個(gè)子過程:
4.1、得到當(dāng)前線程對(duì)應(yīng)的文檔集處理對(duì)象(DocumentsWriterThreadState)
代碼為:
| DocumentsWriterThreadState state = getThreadState(doc, delTerm); |
在Lucene中,對(duì)于同一個(gè)索引文件夾,只能夠有一個(gè)IndexWriter打開它,在打開后,在文件夾中,生成文件write.lock,當(dāng)其他IndexWriter再試圖打開此索引文件夾的時(shí)候,則會(huì)報(bào)org.apache.lucene.store.LockObtainFailedException錯(cuò)誤。
這樣就出現(xiàn)了這樣一個(gè)問題,在同一個(gè)進(jìn)程中,對(duì)同一個(gè)索引文件夾,只能有一個(gè)IndexWriter打開它,因而如果想多線程向此索引文件夾中添加文檔,則必須共享一個(gè)IndexWriter,而且在以往的實(shí)現(xiàn)中,addDocument函數(shù)是同步的(synchronized),也即多線程的索引并不能起到提高性能的效果。
于是為了支持多線程索引,不使IndexWriter成為瓶頸,對(duì)于每一個(gè)線程都有一個(gè)相應(yīng)的文檔集處理對(duì)象(DocumentsWriterThreadState),這樣對(duì)文檔的索引過程可以多線程并行進(jìn)行,從而增加索引的速度。
getThreadState函數(shù)是同步的(synchronized),DocumentsWriter有一個(gè)成員變量threadBindings,它是一個(gè)HashMap,鍵為線程對(duì)象(Thread.currentThread()),值為此線程對(duì)應(yīng)的DocumentsWriterThreadState對(duì)象。
DocumentsWriterThreadState DocumentsWriter.getThreadState(Document doc, Term delTerm)包含如下幾個(gè)過程:
- 根據(jù)當(dāng)前線程對(duì)象,從HashMap中查找相應(yīng)的DocumentsWriterThreadState對(duì)象,如果沒找到,則生成一個(gè)新對(duì)象,并添加到HashMap中
| DocumentsWriterThreadState state = (DocumentsWriterThreadState) threadBindings.get(Thread.currentThread());? if (state == null) {? ??? ……? ??? state = new DocumentsWriterThreadState(this);? ??? ……? ??? threadBindings.put(Thread.currentThread(), state);? }? |
- 如果此線程對(duì)象正在用于處理上一篇文檔,則等待,直到此線程的上一篇文檔處理完。
| DocumentsWriter.getThreadState() {? ??? waitReady(state);? ??? state.isIdle = false;? }? waitReady(state) {? ??? while (!state.isIdle) {wait();}? }?? 顯然如果state.isIdle為false,則此線程等待。? 在一篇文檔處理之前,state.isIdle = false會(huì)被設(shè)定,而在一篇文檔處理完畢之后,DocumentsWriter.finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter)中,會(huì)首先設(shè)定perThread.isIdle = true; 然后notifyAll()來喚醒等待此文檔完成的線程,從而處理下一篇文檔。 |
- 如果IndexWriter剛剛commit過,則新添加的文檔要加入到新的段中(segment),則首先要生成新的段名。
| initSegmentName(false);? --> if (segment == null) segment = writer.newSegmentName(); |
- 將此線程的文檔處理對(duì)象設(shè)為忙碌:state.isIdle = false;
4.2、用得到的文檔集處理對(duì)象(DocumentsWriterThreadState)處理文檔
代碼為:
| DocWriter perDoc = state.consumer.processDocument(); |
每一個(gè)文檔集處理對(duì)象DocumentsWriterThreadState都有一個(gè)文檔及域處理對(duì)象DocFieldProcessorPerThread,它的成員函數(shù)processDocument()被調(diào)用來對(duì)文檔及域進(jìn)行處理。
| 線程索引鏈(XXXPerThread): 由于要多線程進(jìn)行索引,因而每個(gè)線程都要有自己的索引鏈,稱為線程索引鏈。 線程索引鏈同基本索引鏈有相似的樹形結(jié)構(gòu),由基本索引鏈中每個(gè)層次的對(duì)象調(diào)用addThreads進(jìn)行創(chuàng)建的,負(fù)責(zé)每個(gè)線程的對(duì)文檔的處理。 DocFieldProcessorPerThread是線程索引鏈的源頭,由DocFieldProcessor.addThreads(…)創(chuàng)建 DocFieldProcessorPerThread對(duì)象結(jié)構(gòu)如下:
|
DocumentsWriter.DocWriter DocFieldProcessorPerThread.processDocument()包含以下幾個(gè)過程:
4.2.1、開始處理當(dāng)前文檔
| consumer(DocInverterPerThread).startDocument();? |
在此版的Lucene中,幾乎所有的XXXPerThread的類,都有startDocument和finishDocument兩個(gè)函數(shù),因?yàn)閷?duì)同一個(gè)線程,這些對(duì)象都是復(fù)用的,而非對(duì)每一篇新來的文檔都創(chuàng)建一套,這樣也提高了效率,也牽扯到數(shù)據(jù)的清理問題。一般在startDocument函數(shù)中,清理處理上篇文檔遺留的數(shù)據(jù),在finishDocument中,收集本次處理的結(jié)果數(shù)據(jù),并返回,一直返回到DocumentsWriter.updateDocument(Document, Analyzer, Term) 然后根據(jù)條件判斷是否將數(shù)據(jù)刷新到硬盤上。
4.2.2、逐個(gè)處理文檔的每一個(gè)域
由于一個(gè)線程可以連續(xù)處理多個(gè)文檔,而在普通的應(yīng)用中,幾乎每篇文檔的域都是大致相同的,為每篇文檔的每個(gè)域都創(chuàng)建一個(gè)處理對(duì)象非常低效,因而考慮到復(fù)用域處理對(duì)象DocFieldProcessorPerField,對(duì)于每一個(gè)域都有一個(gè)此對(duì)象。
那當(dāng)來到一個(gè)新的域的時(shí)候,如何更快的找到此域的處理對(duì)象呢?Lucene創(chuàng)建了一個(gè)DocFieldProcessorPerField[] fieldHash哈希表來方便更快查找域?qū)?yīng)的處理對(duì)象。
當(dāng)處理各個(gè)域的時(shí)候,按什么順序呢?其實(shí)是按照域名的字典順序。因而Lucene創(chuàng)建了DocFieldProcessorPerField[] fields的數(shù)組來方便按順序處理域。
因而一個(gè)域的處理對(duì)象被放在了兩個(gè)地方。
對(duì)于域的處理過程如下:
4.2.2.1、首先:對(duì)于每一個(gè)域,按照域名,在fieldHash中查找域處理對(duì)象DocFieldProcessorPerField,代碼如下:
| final int hashPos = fieldName.hashCode() & hashMask;//計(jì)算哈希值? |
如果能夠找到,則更新DocFieldProcessorPerField中的域信息fp.fieldInfo.update(field.isIndexed()…)
如果沒有找到,則添加域到DocFieldProcessorPerThread.fieldInfos中,并創(chuàng)建新的DocFieldProcessorPerField,且將其加入哈希表。代碼如下:
| fp = new DocFieldProcessorPerField(this, fi);? |
如果是一個(gè)新的field,則將其加入fields數(shù)組fields[fieldCount++] = fp;
并且如果是存儲(chǔ)域的話,用StoredFieldsWriterPerThread將其寫到索引中:
| if (field.isStored()) {? |
4.2.2.1.1、處理存儲(chǔ)域的過程如下:
| StoredFieldsWriterPerThread.addField(Fieldable field, FieldInfo fieldInfo)? --> localFieldsWriter.writeField(fieldInfo, field); |
FieldsWriter.writeField(FieldInfo fi, Fieldable field)代碼如下:
| 請(qǐng)參照fdt文件的格式,則一目了然: fieldsStream.writeVInt(fi.number);//文檔號(hào)? fieldsStream.writeByte(bits); //域的屬性位 if (field.isCompressed()) {//對(duì)于壓縮域? ??????? fieldsStream.writeVInt(len);//寫長度? |
4.2.2.2、然后:對(duì)fields數(shù)組進(jìn)行排序,是域按照名稱排序。quickSort(fields, 0, fieldCount-1);
4.2.2.3、最后:按照排序號(hào)的順序,對(duì)域逐個(gè)處理,此處處理的僅僅是索引域,代碼如下:
| for(int i=0;i????? fields[i].consumer.processFields(fields[i].fields, fields[i].fieldCount); |
域處理對(duì)象(DocFieldProcessorPerField)結(jié)構(gòu)如下:
| 域索引鏈: 每個(gè)域也有自己的索引鏈,稱為域索引鏈,每個(gè)域的索引鏈也有同線程索引鏈有相似的樹形結(jié)構(gòu),由線程索引鏈中每個(gè)層次的每個(gè)層次的對(duì)象調(diào)用addField進(jìn)行創(chuàng)建,負(fù)責(zé)對(duì)此域的處理。 和基本索引鏈及線程索引鏈不同的是,域索引鏈僅僅負(fù)責(zé)處理索引域,而不負(fù)責(zé)存儲(chǔ)域的處理。 DocFieldProcessorPerField是域索引鏈的源頭,對(duì)象結(jié)構(gòu)如下:
|
4.2.2.3.1、處理索引域的過程如下:
DocInverterPerField.processFields(Fieldable[], int) 過程如下:
- 判斷是否要形成倒排表,代碼如下:
| boolean doInvert = consumer.start(fields, count);? --> TermsHashPerField.start(Fieldable[], int)?? ????? --> for(int i=0;i???????????? if (fields[i].isIndexed())? ???????????????? return true;? ??????????? return false; |
讀到這里,大家可能會(huì)發(fā)生困惑,既然XXXPerField是對(duì)于每一個(gè)域有一個(gè)處理對(duì)象的,那為什么參數(shù)傳進(jìn)來的是Fieldable[]數(shù)組, 并且還有域的數(shù)目count呢?
其實(shí)這不經(jīng)常用到,但必須得提一下,由上面的fieldHash的實(shí)現(xiàn)我們可以看到,是根據(jù)域名進(jìn)行哈希的,所以準(zhǔn)確的講,XXXPerField并非對(duì)于每一個(gè)域有一個(gè)處理對(duì)象,而是對(duì)每一組相同名字的域有相同的處理對(duì)象。
對(duì)于同一篇文檔,相同名稱的域可以添加多個(gè),代碼如下:
| doc.add(new Field("contents", "the content of the file.", Field.Store.NO, Field.Index.NOT_ANALYZED));? |
則傳進(jìn)來的名為"contents"的域如下:
| fields??? Fieldable[2]? (id=52)???? |
- 對(duì)傳進(jìn)來的同名域逐一處理,代碼如下
| for(int i=0;i ??? final Fieldable field = fields[i]; ??? if (field.isIndexed() && doInvert) { ??????? //僅僅對(duì)索引域進(jìn)行處理 ??????? if (!field.isTokenized()) { ??????????? //如果此域不分詞,見(1)對(duì)不分詞的域的處理 ??????? } else { ??????????? //如果此域分詞,見(2)對(duì)分詞的域的處理 ??????? } ??? } } |
(1) 對(duì)不分詞的域的處理
(1-1) 得到域的內(nèi)容,并構(gòu)建單個(gè)Token形成的SingleTokenAttributeSource。因?yàn)椴贿M(jìn)行分詞,因而整個(gè)域的內(nèi)容算做一個(gè)Token.
String stringValue = field.stringValue(); //stringValue??? "200910240957"??
final int valueLength = stringValue.length();?
perThread.singleToken.reinit(stringValue, 0, valueLength);
對(duì)于此域唯一的一個(gè)Token有以下的屬性:
- Term:文字信息。在處理過程中,此值將保存在TermAttribute的實(shí)現(xiàn)類實(shí)例化的對(duì)象TermAttributeImp里面。
- Offset:偏移量信息,是按字或字母的起始偏移量和終止偏移量,表明此Token在文章中的位置,多用于加亮。在處理過程中,此值將保存在OffsetAttribute的實(shí)現(xiàn)類實(shí)例化的對(duì)象OffsetAttributeImp里面。
在SingleTokenAttributeSource里面,有一個(gè)HashMap來保存可能用于保存屬性的類名(Key,準(zhǔn)確的講是接口)以及保存屬性信息的對(duì)象(Value):
| singleToken??? DocInverterPerThread$SingleTokenAttributeSource? (id=150)???? |
(1-2) 得到Token的各種屬性信息,為索引做準(zhǔn)備。
consumer.start(field)做的主要事情就是根據(jù)各種屬性的類型來構(gòu)造保存屬性的對(duì)象(HashMap中有則取出,無則構(gòu)造),為索引做準(zhǔn)備。
| consumer(TermsHashPerField).start(…) --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的就是上述HashMap中的TermAttributeImpl??? --> consumer(FreqProxTermsWriterPerField).start(f); ????? --> if (fieldState.attributeSource.hasAttribute(PayloadAttribute.class)) { ??????????????? payloadAttribute = fieldState.attributeSource.getAttribute(PayloadAttribute.class);? --> nextPerField(TermsHashPerField).start(f); ????? --> termAtt = fieldState.attributeSource.addAttribute(TermAttribute.class);得到的還是上述HashMap中的TermAttributeImpl ????? --> consumer(TermVectorsTermsWriterPerField).start(f); ??????????? --> if (doVectorOffsets) { ????????????????????? offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);? |
(1-3) 將Token加入倒排表
consumer(TermsHashPerField).add();
加入倒排表的過程,無論對(duì)于分詞的域和不分詞的域,過程是一樣的,因而放到對(duì)分詞的域的解析中一起說明。
(2) 對(duì)分詞的域的處理
(2-1) 構(gòu)建域的TokenStream
| final TokenStream streamValue = field.tokenStreamValue(); //用戶可以在添加域的時(shí)候,應(yīng)用構(gòu)造函數(shù)public Field(String name, TokenStream tokenStream) 直接傳進(jìn)一個(gè)TokenStream過來,這樣就不用另外構(gòu)建一個(gè)TokenStream了。 if (streamValue != null)? ? …… ? stream = docState.analyzer.reusableTokenStream(fieldInfo.name, reader); } |
此時(shí)TokenStream的各項(xiàng)屬性值還都是空的,等待一個(gè)一個(gè)被分詞后得到,此時(shí)的TokenStream對(duì)象如下:
| stream??? StopFilter? (id=112)???? |
(2-2) 得到第一個(gè)Token,并初始化此Token的各項(xiàng)屬性信息,并為索引做準(zhǔn)備(start)。
boolean hasMoreTokens = stream.incrementToken();//得到第一個(gè)Token
OffsetAttribute offsetAttribute = fieldState.attributeSource.addAttribute(OffsetAttribute.class);//得到偏移量屬性
| offsetAttribute??? OffsetAttributeImpl? (id=164)???? |
PositionIncrementAttribute posIncrAttribute = fieldState.attributeSource.addAttribute(PositionIncrementAttribute.class);//得到位置屬性
| posIncrAttribute??? PositionIncrementAttributeImpl? (id=129)???? |
consumer.start(field);//其中得到了TermAttribute屬性,如果存儲(chǔ)payload則得到PayloadAttribute屬性,如果存儲(chǔ)詞向量則得到OffsetAttribute屬性。
(2-3) 進(jìn)行循環(huán),不斷的取下一個(gè)Token,并添加到倒排表
| for(;;) { ??? if (!hasMoreTokens) break; ??? ……? ??? ……? |
(2-4) 添加Token到倒排表的過程consumer(TermsHashPerField).add()
TermsHashPerField對(duì)象主要包括以下部分:
- CharBlockPool charPool; 用于存儲(chǔ)Token的文本信息,如果不足時(shí),從DocumentsWriter中的freeCharBlocks分配
- ByteBlockPool bytePool;用于存儲(chǔ)freq, prox信息,如果不足時(shí),從DocumentsWriter中的freeByteBlocks分配
- IntBlockPool intPool; 用于存儲(chǔ)分別指向每個(gè)Token在bytePool中freq和prox信息的偏移量。如果不足時(shí),從DocumentsWriter的freeIntBlocks分配
- TermsHashConsumerPerField consumer類型為FreqProxTermsWriterPerField,用于寫freq, prox信息到緩存中。
- RawPostingList[] postingsHash = new RawPostingList[postingsHashSize];存儲(chǔ)倒排表,每一個(gè)Term都有一個(gè)RawPostingList (PostingList),其中包含了int textStart,也即文本在charPool中的偏移量,int byteStart,即此Term的freq和prox信息在bytePool中的起始偏移量,int intStart,即此term的在intPool中的起始偏移量。
形成倒排表的過程如下:
| //得到token的文本及文本長度 final char[] tokenText = termAtt.termBuffer();//[s, t, u, d, e, n, t, s] final int tokenTextLen = termAtt.termLength();//tokenTextLen 8 //按照token的文本計(jì)算哈希值,以便在postingsHash中找到此token對(duì)應(yīng)的倒排表 int downto = tokenTextLen;? int hashPos = code & postingsHashMask; //在倒排表哈希表中查找此Token,如果找到相應(yīng)的位置,但是不是此Token,說明此位置存在哈希沖突,采取重新哈希rehash的方法。 p = postingsHash[hashPos]; if (p != null && !postingEquals(tokenText, tokenTextLen)) {?? //如果此Token之前從未出現(xiàn)過 if (p == null) { ??? if (textLen1 + charPool.charUpto > DocumentsWriter.CHAR_BLOCK_SIZE) { ??????? //當(dāng)charPool不足的時(shí)候,在freeCharBlocks中分配新的buffer ??????? charPool.nextBuffer(); ??? } ??? //從空閑的倒排表中分配新的倒排表 ??? p = perThread.freePostings[--perThread.freePostingsCount]; ??? //將文本復(fù)制到charPool中 ??? final char[] text = charPool.buffer;? ??? //將倒排表放入哈希表中 ??? postingsHash[hashPos] = p;? ??? if (numPostingInt + intPool.intUpto > DocumentsWriter.INT_BLOCK_SIZE) intPool.nextBuffer(); ??? //當(dāng)intPool不足的時(shí)候,在freeIntBlocks中分配新的buffer。 ??? if (DocumentsWriter.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) ??????? bytePool.nextBuffer(); ??? //當(dāng)bytePool不足的時(shí)候,在freeByteBlocks中分配新的buffer。 ??? //此處streamCount為2,表明在intPool中,每兩項(xiàng)表示一個(gè)詞,一個(gè)是指向bytePool中freq信息偏移量的,一個(gè)是指向bytePool中prox信息偏移量的。 ??? intUptos = intPool.buffer;? ??? p.intStart = intUptoStart + intPool.intOffset; ??? //在bytePool中分配兩個(gè)空間,一個(gè)放freq信息,一個(gè)放prox信息的。?? ??????? final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);? ??? //當(dāng)Term原來沒有出現(xiàn)過的時(shí)候,調(diào)用newTerm ??? consumer(FreqProxTermsWriterPerField).newTerm(p); } //如果此Token之前曾經(jīng)出現(xiàn)過,則調(diào)用addTerm。 else { ??? intUptos = intPool.buffers[p.intStart >> DocumentsWriter.INT_BLOCK_SHIFT];? } |
(2-5) 添加新Term的過程,consumer(FreqProxTermsWriterPerField).newTerm
| final void newTerm(RawPostingList p0) {? writeProx(FreqProxTermsWriter.PostingList p, int proxCode) { ? termsHashPerField.writeVInt(1, proxCode<<1);//第一個(gè)參數(shù)所謂1,也就是寫入此文檔在intPool中的第1項(xiàng)——prox信息。為什么左移一位呢?是因?yàn)楹竺婵赡芨鴓ayload信息,參照索引文件格式(1)中或然跟隨規(guī)則。? } |
(2-6) 添加已有Term的過程
| final void addTerm(RawPostingList p0) { ? FreqProxTermsWriter.PostingList p = (FreqProxTermsWriter.PostingList) p0; ? if (docState.docID != p.lastDocID) { ????? //當(dāng)文檔ID變了的時(shí)候,說明上一篇文檔已經(jīng)處理完畢,可以寫入freq信息了。 ????? //第一個(gè)參數(shù)所謂0,也就是寫入上一篇文檔在intPool中的第0項(xiàng)——freq信息。至于信息為何這樣寫,參照索引文件格式(1)中的或然跟隨規(guī)則,及tis文件格式。 ????? if (1 == p.docFreq)? ????? //當(dāng)文檔ID不變的時(shí)候,說明此文檔中這個(gè)詞又出現(xiàn)了一次,從而freq加一,寫入再次出現(xiàn)的位置信息,用差值。? |
(2-7) 結(jié)束處理當(dāng)前域
| consumer(TermsHashPerField).finish(); --> FreqProxTermsWriterPerField.finish() --> TermVectorsTermsWriterPerField.finish() endConsumer(NormsWriterPerField).finish(); --> norms[upto] = Similarity.encodeNorm(norm);//計(jì)算標(biāo)準(zhǔn)化因子的值。 --> docIDs[upto] = docState.docID; |
4.2.3、結(jié)束處理當(dāng)前文檔
final DocumentsWriter.DocWriter one = fieldsWriter(StoredFieldsWriterPerThread).finishDocument();
存儲(chǔ)域返回結(jié)果:一個(gè)寫成了二進(jìn)制的存儲(chǔ)域緩存。
| one??? StoredFieldsWriter$PerDoc? (id=322)???? |
final DocumentsWriter.DocWriter two = consumer(DocInverterPerThread).finishDocument();
--> NormsWriterPerThread.finishDocument()
--> TermsHashPerThread.finishDocument()
索引域的返回結(jié)果為null
4.3、用DocumentsWriter.finishDocument結(jié)束本次文檔添加
代碼:
| DocumentsWriter.updateDocument(Document, Analyzer, Term) --> DocumentsWriter.finishDocument(DocumentsWriterThreadState, DocumentsWriter$DocWriter) ????? --> doPause = waitQueue.add(docWriter);//有關(guān)waitQueue,在DocumentsWriter的緩存管理中已作解釋 ??????????? --> DocumentsWriter$WaitQueue.writeDocument(DocumentsWriter$DocWriter) ????????????????? --> StoredFieldsWriter$PerDoc.finish() ??????????????????????? --> fieldsWriter.flushDocument(perDoc.numStoredFields, perDoc.fdt);將存儲(chǔ)域信息真正寫入文件。 |
5、DocumentsWriter對(duì)CharBlockPool,ByteBlockPool,IntBlockPool的緩存管理
- 在索引的過程中,DocumentsWriter將詞信息(term)存儲(chǔ)在CharBlockPool中,將文檔號(hào)(doc ID),詞頻(freq)和位置(prox)信息存儲(chǔ)在ByteBlockPool中。
- 在ByteBlockPool中,緩存是分塊(slice)分配的,塊(slice)是分層次的,層次越高,此層的塊越大,每一層的塊大小事相同的。
- nextLevelArray表示的是當(dāng)前層的下一層是第幾層,可見第9層的下一層還是第9層,也就是說最高有9層。
- levelSizeArray表示每一層的塊大小,第一層是5個(gè)byte,第二層是14個(gè)byte以此類推。
| ByteBlockPool類中有以下靜態(tài)變量: final static int[] nextLevelArray = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};? |
- 在ByteBlockPool中分配一個(gè)塊的代碼如下:
| ? //此函數(shù)僅僅在upto已經(jīng)是當(dāng)前塊的結(jié)尾的時(shí)候方才調(diào)用來分配新塊。 public int allocSlice(final byte[] slice, final int upto) { ? //可根據(jù)塊的結(jié)束符來得到塊所在的層次。從而我們可以推斷,每個(gè)層次的塊都有不同的結(jié)束符,第1層為16,第2層位17,第3層18,依次類推。 ? final int level = slice[upto] & 15; ? //從數(shù)組總得到下一個(gè)層次及下一層塊的大小。 ? final int newLevel = nextLevelArray[level]; ? final int newSize = levelSizeArray[newLevel]; ? // 如果當(dāng)前緩存總量不夠大,則從DocumentsWriter的freeByteBlocks中分配。 ? if (byteUpto > DocumentsWriter.BYTE_BLOCK_SIZE-newSize) ??? nextBuffer(); ? final int newUpto = byteUpto; ? final int offset = newUpto + byteOffset; ? byteUpto += newSize; ? //當(dāng)分配了新的塊的時(shí)候,需要有一個(gè)指針從本塊指向下一個(gè)塊,使得讀取此信息的時(shí)候,能夠在此塊讀取結(jié)束后,到下一個(gè)塊繼續(xù)讀取。 ? //這個(gè)指針需要4個(gè)byte,在本塊中,除了結(jié)束符所占用的一個(gè)byte之外,之前的三個(gè)byte的數(shù)據(jù)都應(yīng)該移到新的塊中,從而四個(gè)byte連起來形成一個(gè)指針。 ? buffer[newUpto] = slice[upto-3]; ? buffer[newUpto+1] = slice[upto-2]; ? buffer[newUpto+2] = slice[upto-1]; ? // 將偏移量(也即指針)寫入到連同結(jié)束符在內(nèi)的四個(gè)byte ? slice[upto-3] = (byte) (offset >>> 24); ? slice[upto-2] = (byte) (offset >>> 16); ? slice[upto-1] = (byte) (offset >>> 8); ? slice[upto] = (byte) offset; ? // 在新的塊的結(jié)尾寫入新的結(jié)束符,結(jié)束符和層次的關(guān)系就是(endbyte = 16 | level) ? buffer[byteUpto-1] = (byte) (16|newLevel); ? return newUpto+3; } |
- 在ByteBlockPool中,文檔號(hào)和詞頻(freq)信息是應(yīng)用或然跟隨原則寫到一個(gè)塊中去的,而位置信息(prox)是寫入到另一個(gè)塊中去的,對(duì)于同一個(gè)詞,這兩塊的偏移量保存在IntBlockPool中。因而在IntBlockPool中,每一個(gè)詞都有兩個(gè)int,第0個(gè)表示docid + freq在ByteBlockPool中的偏移量,第1個(gè)表示prox在ByteBlockPool中的偏移量。
- 在寫入docid + freq信息的時(shí)候,調(diào)用termsHashPerField.writeVInt(0, p.lastDocCode),第一個(gè)參數(shù)表示向此詞的第0個(gè)偏移量寫入;在寫入prox信息的時(shí)候,調(diào)用termsHashPerField.writeVInt(1, (proxCode<<1)|1),第一個(gè)參數(shù)表示向此詞的第1個(gè)偏移量寫入。
- CharBlockPool是按照出現(xiàn)的先后順序保存詞(term)
- 在TermsHashPerField中,有一個(gè)成員變量RawPostingList[] postingsHash,為每一個(gè)term分配了一個(gè)RawPostingList,將上述三個(gè)緩存關(guān)聯(lián)起來。
| ? abstract class RawPostingList { ? final static int BYTES_SIZE = DocumentsWriter.OBJECT_HEADER_BYTES + 3*DocumentsWriter.INT_NUM_BYTE; ? int textStart; //此詞在CharBlockPool中的偏移量,由此可以知道是哪個(gè)詞。 ? int intStart; //此詞在IntBlockPool中的偏移量,在指向的位置有兩個(gè)int,一個(gè)是docid + freq信息的偏移量,一個(gè)是prox信息的偏移量。 ? int byteStart; //此詞在ByteBlockPool中的起始偏移量 } static final class PostingList extends RawPostingList { ? int docFreq;??????????????????????????????????? // 此詞在此文檔中出現(xiàn)的次數(shù) ? int lastDocID;????????????????????????????????? // 上次處理完的包含此詞的文檔號(hào)。 ? int lastDocCode;??????????????????????????????? // 文檔號(hào)和詞頻按照或然跟隨原則形成的編碼 ? int lastPosition;?????????????????????????????? // 上次處理完的此詞的位置 } 這里需要說明的是,在IntBlockPool中保存了兩個(gè)在ByteBlockPool中的偏移量,而在RawPostingList的byteStart又保存了在ByteBlockPool中的偏移量,這兩者有什么區(qū)別呢? 在IntBlockPool中保存的分別指向docid+freq及prox信息在ByteBlockPool中的偏移量是主要用來寫入信息的,它記錄的偏移量是下一個(gè)要寫入的docid+freq或者prox在ByteBlockPool中的位置,隨著信息的不斷寫入,IntBlockPool中的兩個(gè)偏移量是不斷改變的,始終指向下一個(gè)可以寫入的位置。 RawPostingList中byteStart主要是用來讀取docid及prox信息的,當(dāng)索引過程基本結(jié)束,所有的信息都寫入在緩存中了,那么如何找到此詞對(duì)應(yīng)的文檔號(hào)偏移量及位置信息,然后寫到索引文件中去呢?自然是通過RawPostingList找到byteStart,然后根據(jù)byteStart在ByteBlockPool中找到docid+freq及prox信息的起始位置,從起始位置開始的兩個(gè)大小為5的塊,第一個(gè)就是docid+freq信息的源頭,第二個(gè)就是prox信息的源頭,如果源頭的塊中包含了所有的信息,讀出來就可以了,如果源頭的塊中有指針,則沿著指針尋找到下一個(gè)塊,從而可以找到所有的信息。 |
- 下面舉一個(gè)實(shí)例來表明如果進(jìn)行緩存管理的:
| 此例子中,準(zhǔn)備添加三個(gè)文件: file01: common common common common common term file02: common common common common common term term file03: term term term common common common common common file04: term (1) 添加第一篇文檔第一個(gè)common
? (2) 添加第四個(gè)common
? (3) 添加第五個(gè)common
? (4) 添加第一篇文檔,第一個(gè)term
? (5) 添加第二篇文檔第一個(gè)common
? (6) 添加第二篇文檔第一個(gè)term
? (7) 添加第三篇文檔的第一個(gè)term
? (8) 添加第三篇文檔第二個(gè)term
? (9) 添加第三篇文檔第四個(gè)common
? (10) 添加第三篇文檔的第五個(gè)common
? (11) 添加第四篇文檔的第一個(gè)term
? (12) 最終PostingList, CharBlockPool, IntBlockPool,ByteBlockPool的關(guān)系如下圖: ?
? |
?
6、關(guān)閉IndexWriter對(duì)象
代碼:
| writer.close(); --> IndexWriter.closeInternal(boolean) ????? --> (1) 將索引信息由內(nèi)存寫入磁盤: flush(waitForMerges, true, true);? |
對(duì)段的合并將在后面的章節(jié)進(jìn)行討論,此處僅僅討論將索引信息由寫入磁盤的過程。
代碼:
| IndexWriter.flush(boolean triggerMerge, boolean flushDocStores, boolean flushDeletes) --> IndexWriter.doFlush(boolean flushDocStores, boolean flushDeletes) ????? --> IndexWriter.doFlushInternal(boolean flushDocStores, boolean flushDeletes) |
將索引寫入磁盤包括以下幾個(gè)過程:
- 得到要寫入的段名:String segment = docWriter.getSegment();
- DocumentsWriter將緩存的信息寫入段:docWriter.flush(flushDocStores);
- 生成新的段信息對(duì)象:newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx());
- 準(zhǔn)備刪除文檔:docWriter.pushDeletes();
- 生成cfs段:docWriter.createCompoundFile(segment);
- 刪除文檔:applyDeletes();
6.1、得到要寫入的段名
代碼:
| SegmentInfo newSegment = null; final int numDocs = docWriter.getNumDocsInRAM();//文檔總數(shù) String docStoreSegment = docWriter.getDocStoreSegment();//存儲(chǔ)域和詞向量所要要寫入的段名,"_0"??? int docStoreOffset = docWriter.getDocStoreOffset();//存儲(chǔ)域和詞向量要寫入的段中的偏移量 String segment = docWriter.getSegment();//段名,"_0" |
在Lucene的索引文件結(jié)構(gòu)一章做過詳細(xì)介紹,存儲(chǔ)域和詞向量可以和索引域存儲(chǔ)在不同的段中。
6.2、將緩存的內(nèi)容寫入段
代碼:
| flushedDocCount = docWriter.flush(flushDocStores); |
此過程又包含以下兩個(gè)階段;
- 按照基本索引鏈關(guān)閉存儲(chǔ)域和詞向量信息
- 按照基本索引鏈的結(jié)構(gòu)將索引結(jié)果寫入段
6.2.1、按照基本索引鏈關(guān)閉存儲(chǔ)域和詞向量信息
代碼為:
| closeDocStore(); flushState.numDocsInStore = 0; |
其主要是根據(jù)基本索引鏈結(jié)構(gòu),關(guān)閉存儲(chǔ)域和詞向量信息:
- consumer(DocFieldProcessor).closeDocStore(flushState);
- consumer(DocInverter).closeDocStore(state);
- consumer(TermsHash).closeDocStore(state);
- consumer(FreqProxTermsWriter).closeDocStore(state);
- if (nextTermsHash != null) nextTermsHash.closeDocStore(state);
- consumer(TermVectorsTermsWriter).closeDocStore(state);
- endConsumer(NormsWriter).closeDocStore(state);
- consumer(TermsHash).closeDocStore(state);
- fieldsWriter(StoredFieldsWriter).closeDocStore(state);
- consumer(DocInverter).closeDocStore(state);
其中有實(shí)質(zhì)意義的是以下兩個(gè)closeDocStore:
- 詞向量的關(guān)閉:TermVectorsTermsWriter.closeDocStore(SegmentWriteState)
| void closeDocStore(final SegmentWriteState state) throws IOException { ???????????????????? if (tvx != null) {???????????? //為不保存詞向量的文檔在tvd文件中寫入零。即便不保存詞向量,在tvx, tvd中也保留一個(gè)位置? ??????????? fill(state.numDocsInStore - docWriter.getDocStoreOffset());? ??????????? //關(guān)閉tvx, tvf, tvd文件的寫入流? ??????????? tvx.close();? ??????????? tvf.close();? ??????????? tvd.close();? ??????????? tvx = null;? ??????????? //記錄寫入的文件名,為以后生成cfs文件的時(shí)候,將這些寫入的文件生成一個(gè)統(tǒng)一的cfs文件。? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);? ??????????? state.flushedFiles.add(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);? ??????????? //從DocumentsWriter的成員變量openFiles中刪除,未來可能被IndexFileDeleter刪除? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_INDEX_EXTENSION);? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_FIELDS_EXTENSION);? ??????????? docWriter.removeOpenFile(state.docStoreSegmentName + "." + IndexFileNames.VECTORS_DOCUMENTS_EXTENSION);? ??????????? lastDocID = 0;? ??????? }????? } |
- 存儲(chǔ)域的關(guān)閉:StoredFieldsWriter.closeDocStore(SegmentWriteState)
| public void closeDocStore(SegmentWriteState state) throws IOException { ??? //關(guān)閉fdx, fdt寫入流 ??? fieldsWriter.close();? ??? //記錄寫入的文件名? |
6.2.2、按照基本索引鏈的結(jié)構(gòu)將索引結(jié)果寫入段
代碼為:
| consumer(DocFieldProcessor).flush(threads, flushState); ??? //回收fieldHash,以便用于下一輪的索引,為提高效率,索引鏈中的對(duì)象是被復(fù)用的。 ??? Map> childThreadsAndFields = new HashMap>();? ??? //寫入存儲(chǔ)域 ??? --> fieldsWriter(StoredFieldsWriter).flush(state); ??? //寫入索引域 ??? --> consumer(DocInverter).flush(childThreadsAndFields, state); ??? //寫入域元數(shù)據(jù)信息,并記錄寫入的文件名,以便以后生成cfs文件 ??? --> final String fileName = state.segmentFileName(IndexFileNames.FIELD_INFOS_EXTENSION); ??? --> fieldInfos.write(state.directory, fileName); ??? --> state.flushedFiles.add(fileName); |
此過程也是按照基本索引鏈來的:
- consumer(DocFieldProcessor).flush(…);
- consumer(DocInverter).flush(…);
- consumer(TermsHash).flush(…);
- consumer(FreqProxTermsWriter).flush(…);
- if (nextTermsHash != null) nextTermsHash.flush(…);
- consumer(TermVectorsTermsWriter).flush(…);
- endConsumer(NormsWriter).flush(…);
- consumer(TermsHash).flush(…);
- fieldsWriter(StoredFieldsWriter).flush(…);
- consumer(DocInverter).flush(…);
6.2.2.1、寫入存儲(chǔ)域
代碼為:
| StoredFieldsWriter.flush(SegmentWriteState state) {? |
從代碼中可以看出,是寫入fdx, fdt兩個(gè)文件,但是在上述的closeDocStore已經(jīng)寫入了,并且把state.numDocsInStore置零,fieldsWriter設(shè)為null,在這里其實(shí)什么也不做。
6.2.2.2、寫入索引域
代碼為:
| DocInverter.flush(Map>, SegmentWriteState) ??? //寫入倒排表及詞向量信息 ??? --> consumer(TermsHash).flush(childThreadsAndFields, state); ??? //寫入標(biāo)準(zhǔn)化因子 ??? --> endConsumer(NormsWriter).flush(endChildThreadsAndFields, state); |
6.2.2.2.1、寫入倒排表及詞向量信息
代碼為:
| TermsHash.flush(Map>, SegmentWriteState) ??? //寫入倒排表信息 ??? --> consumer(FreqProxTermsWriter).flush(childThreadsAndFields, state); ?? //回收RawPostingList ??? --> shrinkFreePostings(threadsAndFields, state); ??? //寫入詞向量信息 ??? --> if (nextTermsHash != null) nextTermsHash.flush(nextThreadsAndFields, state); ????????? --> consumer(TermVectorsTermsWriter).flush(childThreadsAndFields, state); |
6.2.2.2.1.1、寫入倒排表信息
代碼為:
| FreqProxTermsWriter.flush(Map?????????????????????????????????????? Collection>, SegmentWriteState) ????(a) 所有域按名稱排序,使得同名域能夠一起處理 ??? Collections.sort(allFields); ??? final int numAllFields = allFields.size(); ????(b) 生成倒排表的寫對(duì)象 ??? final FormatPostingsFieldsConsumer consumer = new FormatPostingsFieldsWriter(state, fieldInfos); ??? int start = 0; ????(c) 對(duì)于每一個(gè)域 ??? while(start < numAllFields) { ????????(c-1) 找出所有的同名域 ??????? final FieldInfo fieldInfo = allFields.get(start).fieldInfo; ??????? final String fieldName = fieldInfo.name; ??????? int end = start+1; ??????? while(end < numAllFields && allFields.get(end).fieldInfo.name.equals(fieldName)) ??????????? end++; ??????? FreqProxTermsWriterPerField[] fields = new FreqProxTermsWriterPerField[end-start]; ??????? for(int i=start;i ??????????? fields[i-start] = allFields.get(i); ??????????? fieldInfo.storePayloads |= fields[i-start].hasPayloads; ??????? } ????????(c-2) 將同名域的倒排表添加到文件 ??????? appendPostings(fields, consumer); ???????(c-3) 釋放空間 ??????? for(int i=0;i ??????????? TermsHashPerField perField = fields[i].termsHashPerField; ??????????? int numPostings = perField.numPostings; ??????????? perField.reset(); ??????????? perField.shrinkHash(numPostings); ??????????? fields[i].reset(); ??????? } ??????? start = end; ??? } ????(d) 關(guān)閉倒排表的寫對(duì)象 ??? consumer.finish(); |
(b) 生成倒排表的寫對(duì)象
代碼為:
| public FormatPostingsFieldsWriter(SegmentWriteState state, FieldInfos fieldInfos) throws IOException {? ??? dir = state.directory;? ??? segment = state.segmentName;? ??? totalNumDocs = state.numDocs;? ??? this.fieldInfos = fieldInfos;? ??? //用于寫tii,tis? ??? termsOut = new TermInfosWriter(dir, segment, fieldInfos, state.termIndexInterval);? ??? //用于寫freq, prox的跳表?? ??? skipListWriter = new DefaultSkipListWriter(termsOut.skipInterval, termsOut.maxSkipLevels, totalNumDocs, null, null);? ??? //記錄寫入的文件名,? ??? state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_EXTENSION));? ??? state.flushedFiles.add(state.segmentFileName(IndexFileNames.TERMS_INDEX_EXTENSION));?? ??? //用以上兩個(gè)寫對(duì)象,按照一定的格式寫入段? ??? termsWriter = new FormatPostingsTermsWriter(state, this);? } |
對(duì)象結(jié)構(gòu)如下:
| consumer????FormatPostingsFieldsWriter? (id=119)? //用于處理一個(gè)域? ??? dir??? SimpleFSDirectory? (id=126)?? //目標(biāo)索引文件夾? ??? totalNumDocs??? 8?? //文檔總數(shù)? ??? fieldInfos??? FieldInfos? (id=70)? //域元數(shù)據(jù)信息??? ??? segment??? "_0"?? //段名? ??? skipListWriter??? DefaultSkipListWriter? (id=133)? //freq, prox中跳表的寫對(duì)象??? ??? termsOut??? TermInfosWriter? (id=125)? //tii, tis文件的寫對(duì)象? ??? termsWriter????FormatPostingsTermsWriter? (id=135)? //用于添加詞(Term)? ??????? currentTerm??? null???? ??????? currentTermStart??? 0???? ??????? fieldInfo??? null???? ??????? freqStart??? 0???? ??????? proxStart??? 0???? ??????? termBuffer??? null???? ??????? termsOut??? TermInfosWriter? (id=125)???? ??????? docsWriter????FormatPostingsDocsWriter? (id=139)? //用于寫入此詞的docid, freq信息? ??????????? df??? 0???? ??????????? fieldInfo??? null???? ??????????? freqStart??? 0???? ??????????? lastDocID??? 0???? ??????????? omitTermFreqAndPositions??? false???? ??????????? out??? SimpleFSDirectory$SimpleFSIndexOutput? (id=144)???? ??????????? skipInterval??? 16???? ??????????? skipListWriter??? DefaultSkipListWriter? (id=133)???? ??????????? storePayloads??? false???? ??????????? termInfo??? TermInfo? (id=151)???? ??????????? totalNumDocs??? 8????? ??????????? posWriter????FormatPostingsPositionsWriter? (id=146)? //用于寫入此詞在此文檔中的位置信息??? ??????????????? lastPayloadLength??? -1???? ??????????????? lastPosition??? 0???? ??????????????? omitTermFreqAndPositions??? false???? ??????????????? out??? SimpleFSDirectory$SimpleFSIndexOutput? (id=157)???? ??????????????? parent??? FormatPostingsDocsWriter? (id=139)???? ??????????????? storePayloads??? false??? |
- FormatPostingsFieldsWriter.addField(FieldInfo field)用于添加索引域信息,其返回FormatPostingsTermsConsumer用于添加詞信息
- FormatPostingsTermsConsumer.addTerm(char[] text, int start)用于添加詞信息,其返回FormatPostingsDocsConsumer用于添加freq信息
- FormatPostingsDocsConsumer.addDoc(int docID, int termDocFreq)用于添加freq信息,其返回FormatPostingsPositionsConsumer用于添加prox信息
- FormatPostingsPositionsConsumer.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength)用于添加prox信息
(c-2) 將同名域的倒排表添加到文件
代碼為:
| ? FreqProxTermsWriter.appendPostings(FreqProxTermsWriterPerField[], FormatPostingsFieldsConsumer) { ??? int numFields = fields.length; ??? final FreqProxFieldMergeState[] mergeStates = new FreqProxFieldMergeState[numFields]; ??? for(int i=0;i ????? FreqProxFieldMergeState fms = mergeStates[i] = new FreqProxFieldMergeState(fields[i]); ????? boolean result = fms.nextTerm(); //對(duì)所有的域,取第一個(gè)詞(Term) ??? } ????(1) 添加此域,雖然有多個(gè)域,但是由于是同名域,只取第一個(gè)域的信息即可。返回的是用于添加此域中的詞的對(duì)象。 ??? final FormatPostingsTermsConsumer termsConsumer = consumer.addField(fields[0].fieldInfo); ??? FreqProxFieldMergeState[] termStates = new FreqProxFieldMergeState[numFields]; ??? final boolean currentFieldOmitTermFreqAndPositions = fields[0].fieldInfo.omitTermFreqAndPositions; ????(2) 此while循環(huán)是遍歷每一個(gè)尚有未處理的詞的域,依次按照詞典順序處理這些域所包含的詞。當(dāng)一個(gè)域中的所有的詞都被處理過后,則numFields減一,并從mergeStates數(shù)組中移除此域。直到所有的域的所有的詞都處理完畢,方才退出此循環(huán)。 ??? while(numFields > 0) { ???????(2-1) 找出所有域中按字典順序的下一個(gè)詞。可能多個(gè)同名域中,都包含同一個(gè)term,因而要遍歷所有的numFields,得到所有的域里的下一個(gè)詞,numToMerge即為有多少個(gè)域包含此詞。 ????? termStates[0] = mergeStates[0]; ????? int numToMerge = 1; ????? for(int i=1;i ??????? final char[] text = mergeStates[i].text; ??????? final int textOffset = mergeStates[i].textOffset; ??????? final int cmp = compareText(text, textOffset, termStates[0].text, termStates[0].textOffset); ??????? if (cmp < 0) { ????????? termStates[0] = mergeStates[i]; ????????? numToMerge = 1; ??????? } else if (cmp == 0) ????????? termStates[numToMerge++] = mergeStates[i]; ????? } ????? (2-2) 添加此詞,返回FormatPostingsDocsConsumer用于添加文檔號(hào)(doc ID)及詞頻信息(freq) ????? final FormatPostingsDocsConsumer docConsumer = termsConsumer.addTerm(termStates[0].text, termStates[0].textOffset); ??????(2-3) 由于共numToMerge個(gè)域都包含此詞,每個(gè)詞都有一個(gè)鏈表的文檔號(hào)表示包含這些詞的文檔。此循環(huán)遍歷所有的包含此詞的域,依次按照從小到大的循序添加包含此詞的文檔號(hào)及詞頻信息。當(dāng)一個(gè)域中對(duì)此詞的所有文檔號(hào)都處理過了,則numToMerge減一,并從termStates數(shù)組中移除此域。當(dāng)所有包含此詞的域的所有文檔號(hào)都處理過了,則結(jié)束此循環(huán)。 ????? while(numToMerge > 0) { ????????(2-3-1) 找出最小的文檔號(hào) ??????? FreqProxFieldMergeState minState = termStates[0]; ??????? for(int i=1;i ????????? if (termStates[i].docID < minState.docID) ??????????? minState = termStates[i]; ??????? final int termDocFreq = minState.termFreq; ????????(2-3-2) 添加文檔號(hào)及詞頻信息,并形成跳表,返回FormatPostingsPositionsConsumer用于添加位置(prox)信息 ??????? final FormatPostingsPositionsConsumer posConsumer = docConsumer.addDoc(minState.docID, termDocFreq); ????????//ByteSliceReader是用于讀取bytepool中的prox信息的。 ??????? final ByteSliceReader prox = minState.prox; ??????? if (!currentFieldOmitTermFreqAndPositions) { ????????? int position = 0; ??????????(2-3-3) 此循環(huán)對(duì)包含此詞的文檔,添加位置信息 ????????? for(int j=0;j ??????????? final int code = prox.readVInt(); ??????????? position += code >> 1; ??????????? final int payloadLength; ????????????// 如果此位置有payload信息,則從bytepool中讀出,否則設(shè)為零。 ??????????? if ((code & 1) != 0) { ????????????? payloadLength = prox.readVInt(); ????????????? if (payloadBuffer == null || payloadBuffer.length < payloadLength) ??????????????? payloadBuffer = new byte[payloadLength]; ????????????? prox.readBytes(payloadBuffer, 0, payloadLength); ??????????? } else ????????????? payloadLength = 0; ??????????????//添加位置(prox)信息 ????????????? posConsumer.addPosition(position, payloadBuffer, 0, payloadLength); ????????? } ????????? posConsumer.finish(); ??????? } ???????(2-3-4) 判斷退出條件,上次選中的域取得下一個(gè)文檔號(hào),如果沒有,則說明此域包含此詞的文檔已經(jīng)處理完畢,則從termStates中刪除此域,并將numToMerge減一。然后此域取得下一個(gè)詞,當(dāng)循環(huán)到(2)的時(shí)候,表明此域已經(jīng)開始處理下一個(gè)詞。如果沒有下一個(gè)詞,說明此域中的所有的詞都處理完畢,則從mergeStates中刪除此域,并將numFields減一,當(dāng)numFields為0的時(shí)候,循環(huán)(2)也就結(jié)束了。 ??????? if (!minState.nextDoc()) {//獲得下一個(gè)docid ????????? //如果此域包含此詞的文檔已經(jīng)沒有下一篇docid,則從數(shù)組termStates中移除,numToMerge減一。 ????????? int upto = 0; ????????? for(int i=0;i ??????????? if (termStates[i] != minState) ????????????? termStates[upto++] = termStates[i]; ????????? numToMerge--; ????????? //此域則取下一個(gè)詞(term),在循環(huán)(2)處來參與下一個(gè)詞的合并 ????????? if (!minState.nextTerm()) { ??????????? //如果此域沒有下一個(gè)詞了,則此域從數(shù)組mergeStates中移除,numFields減一。 ??????????? upto = 0; ??????????? for(int i=0;i ????????????? if (mergeStates[i] != minState) ??????????????? mergeStates[upto++] = mergeStates[i]; ??????????? numFields--; ????????? } ??????? } ????? } ??????(2-4) 經(jīng)過上面的過程,docid和freq信息雖已經(jīng)寫入段文件,而跳表信息并沒有寫到文件中,而是寫入skip buffer里面了,此處真正寫入文件。并且詞典(tii, tis)也應(yīng)該寫入文件。 ????? docConsumer(FormatPostingsDocsWriter).finish(); ??? } ??? termsConsumer.finish(); ? } |
(2-3-4) 獲得下一篇文檔號(hào)代碼如下:
| ? public boolean nextDoc() {//如何獲取下一個(gè)docid ? if (freq.eof()) {//如果bytepool中的freq信息已經(jīng)讀完 ??? if (p.lastDocCode != -1) {//由上述緩存管理,PostingList里面還存著最后一篇文檔的文檔號(hào)及詞頻信息,則將最后一篇文檔返回 ????? docID = p.lastDocID; ????? if (!field.omitTermFreqAndPositions) ??????? termFreq = p.docFreq; ????? p.lastDocCode = -1; ????? return true; ??? } else ????? return false;//沒有下一篇文檔 ? } ? final int code = freq.readVInt();//如果bytepool中的freq信息尚未讀完 ? if (field.omitTermFreqAndPositions) ??? docID += code; ? else { ??? //讀出文檔號(hào)及詞頻信息。 ??? docID += code >>> 1; ??? if ((code & 1) != 0) ????? termFreq = 1; ??? else ????? termFreq = freq.readVInt(); ? } ? return true; } |
(2-3-2) 添加文檔號(hào)及詞頻信息代碼如下:
| ? FormatPostingsPositionsConsumer FormatPostingsDocsWriter.addDoc(int docID, int termDocFreq) { ??? final int delta = docID - lastDocID; ??? //當(dāng)文檔數(shù)量達(dá)到skipInterval倍數(shù)的時(shí)候,添加跳表項(xiàng)。 ??? if ((++df % skipInterval) == 0) { ????? skipListWriter.setSkipData(lastDocID, storePayloads, posWriter.lastPayloadLength); ????? skipListWriter.bufferSkip(df); ?? } ?? lastDocID = docID; ?? if (omitTermFreqAndPositions) ???? out.writeVInt(delta); ?? else if (1 == termDocFreq) ???? out.writeVInt((delta<<1) | 1); ?? else { ???? //寫入文檔號(hào)及詞頻信息。 ???? out.writeVInt(delta<<1); ???? out.writeVInt(termDocFreq); ?? } ?? return posWriter; } |
(2-3-3) 添加位置信息:
| ? FormatPostingsPositionsWriter.addPosition(int position, byte[] payload, int payloadOffset, int payloadLength) { ??? final int delta = position - lastPosition; ??? lastPosition = position; ??? if (storePayloads) { ??????? //保存位置及payload信息 ??????? if (payloadLength != lastPayloadLength) { ??????????? lastPayloadLength = payloadLength; ??????????? out.writeVInt((delta<<1)|1); ??????????? out.writeVInt(payloadLength); ??????? } else ??????????? out.writeVInt(delta << 1); ??????????? if (payloadLength > 0) ??????????????? out.writeBytes(payload, payloadLength); ??? } else ??????? out.writeVInt(delta); } |
(2-4) 將跳表和詞典(tii, tis)寫入文件
| FormatPostingsDocsWriter.finish() { ??? //將跳表緩存寫入文件 ??? long skipPointer = skipListWriter.writeSkip(out); ??? if (df > 0) { ????? //將詞典(terminfo)寫入tii,tis文件 ????? parent.termsOut(TermInfosWriter).add(fieldInfo.number, utf8.result, utf8.length, termInfo); ??? } ? } |
將跳表緩存寫入文件:
| DefaultSkipListWriter(MultiLevelSkipListWriter).writeSkip(IndexOutput)? { ??? long skipPointer = output.getFilePointer(); ??? if (skipBuffer == null || skipBuffer.length == 0) return skipPointer; ??? //正如我們?cè)谒饕募袷街蟹治龅哪菢?#xff0c; 高層在前,低層在后,除最低層外,其他的層都有長度保存。 ??? for (int level = numberOfSkipLevels - 1; level > 0; level--) { ????? long length = skipBuffer[level].getFilePointer(); ????? if (length > 0) { ??????? output.writeVLong(length); ??????? skipBuffer[level].writeTo(output); ????? } ??? } ??? //寫入最低層 ??? skipBuffer[0].writeTo(output); ??? return skipPointer; ? } |
將詞典(terminfo)寫入tii,tis文件:
- tii文件是tis文件的類似跳表的東西,是在tis文件中每隔indexInterval個(gè)詞提取出一個(gè)詞放在tii文件中,以便很快的查找到詞。
- 因而TermInfosWriter類型中有一個(gè)成員變量other也是TermInfosWriter類型的,還有一個(gè)成員變量isIndex來表示此對(duì)象是用來寫tii文件的還是用來寫tis文件的。
- 如果一個(gè)TermInfosWriter對(duì)象的isIndex=false則,它是用來寫tis文件的,它的other指向的是用來寫tii文件的TermInfosWriter對(duì)象
- 如果一個(gè)TermInfosWriter對(duì)象的isIndex=true則,它是用來寫tii文件的,它的other指向的是用來寫tis文件的TermInfosWriter對(duì)象
| TermInfosWriter.add (int fieldNumber, byte[] termBytes, int termBytesLength, TermInfo ti) { ??? //如果詞的總數(shù)是indexInterval的倍數(shù),則應(yīng)該寫入tii文件 ??? if (!isIndex && size % indexInterval == 0) ????? other.add(lastFieldNumber, lastTermBytes, lastTermBytesLength, lastTi); ??? //將詞寫入tis文件 ??? writeTerm(fieldNumber, termBytes, termBytesLength); ??? output.writeVInt(ti.docFreq);?????????????????????? // write doc freq ??? output.writeVLong(ti.freqPointer - lastTi.freqPointer); // write pointers ??? output.writeVLong(ti.proxPointer - lastTi.proxPointer); ??? if (ti.docFreq >= skipInterval) { ????? output.writeVInt(ti.skipOffset); ??? } ??? if (isIndex) { ????? output.writeVLong(other.output.getFilePointer() - lastIndexPointer); ????? lastIndexPointer = other.output.getFilePointer(); // write pointer ??? } ??? lastFieldNumber = fieldNumber; ??? lastTi.set(ti); ??? size++; ? } |
6.2.2.2.1.2、寫入詞向量信息
代碼為:
| TermVectorsTermsWriter.flush (Map>? ??? if (tvx != null) { ????? if (state.numDocsInStore > 0) ??????? fill(state.numDocsInStore - docWriter.getDocStoreOffset()); ????? tvx.flush(); ????? tvd.flush(); ????? tvf.flush(); ??? } ??? for (Map.Entry> entry :? ????? for (final TermsHashConsumerPerField field : entry.getValue() ) { ??????? TermVectorsTermsWriterPerField perField = (TermVectorsTermsWriterPerField) field; ??????? perField.termsHashPerField.reset(); ??????? perField.shrinkHash(); ????? } ????? TermVectorsTermsWriterPerThread perThread = (TermVectorsTermsWriterPerThread) entry.getKey(); ????? perThread.termsHashPerThread.reset(true); ??? } ? } |
從代碼中可以看出,是寫入tvx, tvd, tvf三個(gè)文件,但是在上述的closeDocStore已經(jīng)寫入了,并且把tvx設(shè)為null,在這里其實(shí)什么也不做,僅僅是清空postingsHash,以便進(jìn)行下一輪索引時(shí)重用此對(duì)象。
6.2.2.2.2、寫入標(biāo)準(zhǔn)化因子
代碼為:
| NormsWriter.flush(Map> threadsAndFields, ?????????????????????????? SegmentWriteState state) { ??? final Map> byField = new HashMap>(); ??? for (final Map.Entry> entry :? ???? //遍歷所有的域,將同名域?qū)?yīng)的NormsWriterPerField放到同一個(gè)鏈表中。 ????? final Collection fields = entry.getValue(); ????? final Iterator fieldsIt = fields.iterator(); ????? while (fieldsIt.hasNext()) { ??????? final NormsWriterPerField perField = (NormsWriterPerField) fieldsIt.next(); ??????? List l = byField.get(perField.fieldInfo); ??????? if (l == null) { ??????????? l = new ArrayList(); ??????????? byField.put(perField.fieldInfo, l); ??????? } ??????? l.add(perField); ??? } ??? //記錄寫入的文件名,方便以后生成cfs文件。 ??? final String normsFileName = state.segmentName + "." + IndexFileNames.NORMS_EXTENSION; ??? state.flushedFiles.add(normsFileName); ??? IndexOutput normsOut = state.directory.createOutput(normsFileName); ??? try { ????? //寫入nrm文件頭 ????? normsOut.writeBytes(SegmentMerger.NORMS_HEADER, 0, SegmentMerger.NORMS_HEADER.length); ????? final int numField = fieldInfos.size(); ????? int normCount = 0; ????? //對(duì)每一個(gè)域進(jìn)行處理 ????? for(int fieldNumber=0;fieldNumber ??????? final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber); ??????? //得到同名域的鏈表 ??????? List toMerge = byField.get(fieldInfo); ??????? int upto = 0; ??????? if (toMerge != null) { ????????? final int numFields = toMerge.size(); ????????? normCount++; ????????? final NormsWriterPerField[] fields = new NormsWriterPerField[numFields]; ????????? int[] uptos = new int[numFields]; ????????? for(int j=0;j ??????????? fields[j] = toMerge.get(j); ????????? int numLeft = numFields; ????????? //處理同名的多個(gè)域 ????????? while(numLeft > 0) { ??????????? //得到所有的同名域中最小的文檔號(hào) ??????????? int minLoc = 0; ??????????? int minDocID = fields[0].docIDs[uptos[0]]; ??????????? for(int j=1;j ????????????? final int docID = fields[j].docIDs[uptos[j]]; ????????????? if (docID < minDocID) { ??????????????? minDocID = docID; ??????????????? minLoc = j; ????????????? } ??????????? } ??????????? // 在nrm文件中,每一個(gè)文件都有一個(gè)位置,沒有設(shè)定的,放入默認(rèn)值 ??????????? for (;upto<minDocID;upto++) ????????????? normsOut.writeByte(defaultNorm); ??????????? //寫入當(dāng)前的nrm值 ??????????? normsOut.writeByte(fields[minLoc].norms[uptos[minLoc]]); ??????????? (uptos[minLoc])++; ??????????? upto++; ??????????? //如果當(dāng)前域的文檔已經(jīng)處理完畢,則numLeft減一,歸零時(shí)推出循環(huán)。 ??????????? if (uptos[minLoc] == fields[minLoc].upto) { ????????????? fields[minLoc].reset(); ????????????? if (minLoc != numLeft-1) { ??????????????? fields[minLoc] = fields[numLeft-1]; ??????????????? uptos[minLoc] = uptos[numLeft-1]; ????????????? } ????????????? numLeft--; ??????????? } ????????? } ????????? // 對(duì)所有的未設(shè)定nrm值的文檔寫入默認(rèn)值。 ????????? for(;upto ??????????? normsOut.writeByte(defaultNorm); ??????? } else if (fieldInfo.isIndexed && !fieldInfo.omitNorms) { ????????? normCount++; ????????? // Fill entire field with default norm: ????????? for(;upto ??????????? normsOut.writeByte(defaultNorm); ??????? } ????? } ??? } finally { ????? normsOut.close(); ??? } ? } |
6.2.2.3、寫入域元數(shù)據(jù)
代碼為:
| FieldInfos.write(IndexOutput) { ??? output.writeVInt(CURRENT_FORMAT); ??? output.writeVInt(size()); ??? for (int i = 0; i < size(); i++) { ????? FieldInfo fi = fieldInfo(i); ????? byte bits = 0x0; ????? if (fi.isIndexed) bits |= IS_INDEXED; ????? if (fi.storeTermVector) bits |= STORE_TERMVECTOR; ????? if (fi.storePositionWithTermVector) bits |= STORE_POSITIONS_WITH_TERMVECTOR; ????? if (fi.storeOffsetWithTermVector) bits |= STORE_OFFSET_WITH_TERMVECTOR; ????? if (fi.omitNorms) bits |= OMIT_NORMS; ????? if (fi.storePayloads) bits |= STORE_PAYLOADS; ????? if (fi.omitTermFreqAndPositions) bits |= OMIT_TERM_FREQ_AND_POSITIONS; ????? output.writeString(fi.name); ????? output.writeByte(bits); ??? } } |
此處基本就是按照fnm文件的格式寫入的。
6.3、生成新的段信息對(duì)象
代碼:
| newSegment = new SegmentInfo(segment, flushedDocCount, directory, false, true, docStoreOffset, docStoreSegment, docStoreIsCompoundFile, docWriter.hasProx()); segmentInfos.add(newSegment); |
?
6.4、準(zhǔn)備刪除文檔
代碼:
| docWriter.pushDeletes(); ??? --> deletesFlushed.update(deletesInRAM); |
此處將deletesInRAM全部加到deletesFlushed中,并把deletesInRAM清空。原因上面已經(jīng)闡明。
6.5、生成cfs段
代碼:
| docWriter.createCompoundFile(segment); newSegment.setUseCompoundFile(true); |
代碼為:
| DocumentsWriter.createCompoundFile(String segment) { ??? CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, segment + "." + IndexFileNames.COMPOUND_FILE_EXTENSION); ??? //將上述中記錄的文檔名全部加入cfs段的寫對(duì)象。 ??? for (final String flushedFile : flushState.flushedFiles) ????? cfsWriter.addFile(flushedFile); ??? cfsWriter.close(); ? } |
6.6、刪除文檔
代碼:
| applyDeletes(); |
代碼為:
| boolean applyDeletes(SegmentInfos infos) { ? if (!hasDeletes()) ??? return false; ? final int infosEnd = infos.size(); ? int docStart = 0; ? boolean any = false; ? for (int i = 0; i < infosEnd; i++) { ??? assert infos.info(i).dir == directory; ??? SegmentReader reader = writer.readerPool.get(infos.info(i), false); ??? try { ????? any |= applyDeletes(reader, docStart); ????? docStart += reader.maxDoc(); ??? } finally { ????? writer.readerPool.release(reader); ??? } ? } ? deletesFlushed.clear(); ? return any; } |
- Lucene刪除文檔可以用reader,也可以用writer,但是歸根結(jié)底還是用reader來刪除的。
- reader的刪除有以下三種方式:
- 按照詞刪除,刪除所有包含此詞的文檔。
- 按照文檔號(hào)刪除。
- 按照查詢對(duì)象刪除,刪除所有滿足此查詢的文檔。
- 但是這三種方式歸根結(jié)底還是按照文檔號(hào)刪除,也就是寫.del文件的過程。
| ? private final synchronized boolean applyDeletes(IndexReader reader, int docIDStart) ? throws CorruptIndexException, IOException { ? final int docEnd = docIDStart + reader.maxDoc(); ? boolean any = false; ? //按照詞刪除,刪除所有包含此詞的文檔。 ? TermDocs docs = reader.termDocs(); ? try { ??? for (Entry entry: deletesFlushed.terms.entrySet()) { ????? Term term = entry.getKey(); ????? docs.seek(term); ????? int limit = entry.getValue().getNum(); ????? while (docs.next()) { ??????? int docID = docs.doc(); ??????? if (docIDStart+docID >= limit) ????????? break; ??????? reader.deleteDocument(docID); ??????? any = true; ????? } ??? } ? } finally { ??? docs.close(); ? } ? //按照文檔號(hào)刪除。 ? for (Integer docIdInt : deletesFlushed.docIDs) { ??? int docID = docIdInt.intValue(); ??? if (docID >= docIDStart && docID < docEnd) { ????? reader.deleteDocument(docID-docIDStart); ????? any = true; ??? } ? } ? //按照查詢對(duì)象刪除,刪除所有滿足此查詢的文檔。 ? IndexSearcher searcher = new IndexSearcher(reader); ? for (Entry entry : deletesFlushed.queries.entrySet()) { ??? Query query = entry.getKey(); ??? int limit = entry.getValue().intValue(); ??? Weight weight = query.weight(searcher); ??? Scorer scorer = weight.scorer(reader, true, false); ??? if (scorer != null) { ????? while(true)? { ??????? int doc = scorer.nextDoc(); ??????? if (((long) docIDStart) + doc >= limit) ????????? break; ??????? reader.deleteDocument(doc); ??????? any = true; ????? } ??? } ? } ? searcher.close(); ? return any; } |
更多0
總結(jié)
以上是生活随笔為你收集整理的Lucene学习总结之四:Lucene索引过程分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Lucene学习总结之三:Lucene的
- 下一篇: Lucene学习总结之五:Lucene段