lucene源码分析(8)MergeScheduler
生活随笔
收集整理的這篇文章主要介紹了
lucene源码分析(8)MergeScheduler
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.使用IndexWriter.java
mergeScheduler.merge(this, MergeTrigger.EXPLICIT, newMergesFound);2.定義MergeScheduler
/** <p>Expert: {@link IndexWriter} uses an instance* implementing this interface to execute the merges* selected by a {@link MergePolicy}. The default* MergeScheduler is {@link ConcurrentMergeScheduler}.</p>* @lucene.experimental */3.MergeTrigger 出發merge的事件
/*** MergeTrigger is passed to* {@link MergePolicy#findMerges(MergeTrigger, SegmentInfos, MergePolicy.MergeContext)} to indicate the* event that triggered the merge.*/ public enum MergeTrigger {/*** Merge was triggered by a segment flush.*/SEGMENT_FLUSH,/*** Merge was triggered by a full flush. Full flushes* can be caused by a commit, NRT reader reopen or a close call on the index writer.*/FULL_FLUSH,/*** Merge has been triggered explicitly by the user.*/EXPLICIT,/*** Merge was triggered by a successfully finished merge.*/MERGE_FINISHED,/*** Merge was triggered by a closing IndexWriter.*/CLOSING }4.ConcurrentMergeScheduler默認實現
/** A {@link MergeScheduler} that runs each merge using a* separate thread.** <p>Specify the max number of threads that may run at* once, and the maximum number of simultaneous merges* with {@link #setMaxMergesAndThreads}.</p>** <p>If the number of merges exceeds the max number of threads * then the largest merges are paused until one of the smaller* merges completes.</p>** <p>If more than {@link #getMaxMergeCount} merges are* requested then this class will forcefully throttle the* incoming threads by pausing until one more more merges* complete.</p>** <p>This class attempts to detect whether the index is* on rotational storage (traditional hard drive) or not* (e.g. solid-state disk) and changes the default max merge* and thread count accordingly. This detection is currently* Linux-only, and relies on the OS to put the right value* into /sys/block/<dev>/block/rotational. For all* other operating systems it currently assumes a rotational* disk for backwards compatibility. To enable default* settings for spinning or solid state disks for such* operating systems, use {@link #setDefaultMaxMergesAndThreads(boolean)}.*/5.MergeThread執行merge任務
@Overridepublic void run() {try {if (verbose()) {message(" merge thread: start");} doMerge(writer, merge);if (verbose()) {message(" merge thread: done");}// Let CMS run new merges if necessary:try {merge(writer, MergeTrigger.MERGE_FINISHED, true);} catch (AlreadyClosedException ace) {// OK} catch (IOException ioe) {throw new RuntimeException(ioe);}} catch (Throwable exc) {if (exc instanceof MergePolicy.MergeAbortedException) {// OK to ignore} else if (suppressExceptions == false) {// suppressExceptions is normally only set during// testing. handleMergeException(writer.getDirectory(), exc);}} finally {synchronized(ConcurrentMergeScheduler.this) {removeMergeThread();updateMergeThreads();// In case we had stalled indexing, we can now wake up// and possibly unstall:ConcurrentMergeScheduler.this.notifyAll();}}}}merge過程
/*** Merges the indicated segments, replacing them in the stack with a* single segment.* * @lucene.experimental*/public void merge(MergePolicy.OneMerge merge) throws IOException {boolean success = false;final long t0 = System.currentTimeMillis();final MergePolicy mergePolicy = config.getMergePolicy();try {try {try {mergeInit(merge);if (infoStream.isEnabled("IW")) {infoStream.message("IW", "now merge\n merge=" + segString(merge.segments) + "\n index=" + segString());}mergeMiddle(merge, mergePolicy);mergeSuccess(merge);success = true;} catch (Throwable t) {handleMergeException(t, merge);}} finally {synchronized(this) {mergeFinish(merge);if (success == false) {if (infoStream.isEnabled("IW")) {infoStream.message("IW", "hit exception during merge");}} else if (!merge.isAborted() && (merge.maxNumSegments != UNBOUNDED_MAX_MERGE_SEGMENTS || (!closed && !closing))) {// This merge (and, generally, any change to the// segments) may now enable new merges, so we call// merge policy & update pending merges. updatePendingMerges(mergePolicy, MergeTrigger.MERGE_FINISHED, merge.maxNumSegments);}}}} catch (Throwable t) {// Important that tragicEvent is called after mergeFinish, else we hang// waiting for our merge thread to be removed from runningMerges:tragicEvent(t, "merge");throw t;}if (merge.info != null && merge.isAborted() == false) {if (infoStream.isEnabled("IW")) {infoStream.message("IW", "merge time " + (System.currentTimeMillis()-t0) + " msec for " + merge.info.info.maxDoc() + " docs");}}}6.merge結束
/** Does fininishing for a merge, which is fast but holds* the synchronized lock on IndexWriter instance. */final synchronized void mergeFinish(MergePolicy.OneMerge merge) {// forceMerge, addIndexes or waitForMerges may be waiting// on merges to finish. notifyAll();// It's possible we are called twice, eg if there was an// exception inside mergeInitif (merge.registerDone) {final List<SegmentCommitInfo> sourceSegments = merge.segments;for (SegmentCommitInfo info : sourceSegments) {mergingSegments.remove(info);}merge.registerDone = false;}runningMerges.remove(merge);}?
轉載于:https://www.cnblogs.com/davidwang456/p/10059134.html
總結
以上是生活随笔為你收集整理的lucene源码分析(8)MergeScheduler的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: lucene源码分析(7)Analyze
- 下一篇: elasticsearch的join查询