日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

發(fā)布時間:2024/8/23 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hbase Compaction 源码分析 - CompactSplitThread 线程池选择 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

CompactSplitThread

requestCompactionInternal方法

selectCompaction方法

requestCompaction方法


?

其他相關(guān)文章

Hbase Compaction 源碼分析 - CompactionChecker

Hbase Compaction 源碼分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源碼分析 - CompactSplitThread 線程池選擇

?

CompactSplitThread

從名稱我們可以看出來這是個處理Compcation和Split的線程

我們從下面的方法調(diào)用關(guān)系來看可發(fā)現(xiàn)CompactionChecker會調(diào)用requestCompactionInternal方法

requestCompactionInternal方法

/*** @param r region store belongs to* @param s Store to request compaction on* @param why Why compaction requested -- used in debug messages* @param priority override the default priority (NO_PRIORITY == decide)* @param request custom compaction request. Can be <tt>null</tt> in which case a simple* compaction will be used.*///selectNow:系統(tǒng)自動觸發(fā)的system compaction,selectNow參數(shù)為false,如果周期性或者人工觸發(fā)的major compaction的合并,則selectNow為trueprivate synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,final String why, int priority, CompactionRequest request, boolean selectNow, User user)throws IOException {//判斷Hregionserver服務(wù)是否停止if (this.server.isStopped()|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {return null;}CompactionContext compaction = null;if (selectNow) {//周期執(zhí)行執(zhí)行MajorCompaction,或人工觸發(fā)的major compaction,selectNow為true.compaction = selectCompaction(r, s, priority, request, user);if (compaction == null) return null; // message logged inside}// We assume that most compactions are small. So, put system compactions into small// pool; we will do selection there, and move to large pool if necessary.// throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",// 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());//如果selectNow為ture,需要compaction的文件大小大于throttlePoint值,則使用longCompactions線程,否則使用shortCompactions線程//longCompactions和shortCompactions默認大小都是1,生產(chǎn)環(huán)境可以調(diào)整大一些ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))? longCompactions : shortCompactions;pool.execute(new CompactionRunner(s, r, compaction, pool, user));((HRegion)r).incrementCompactionsQueuedCount();if (LOG.isDebugEnabled()) {String type = (pool == shortCompactions) ? "Small " : "Large ";LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);}return selectNow ? compaction.getRequest() : null;}

?該方法主要內(nèi)容如下

1.如果HRegionServer停止則跳過
2.判斷是否selectNow,如果為true,則執(zhí)行selectCompaction方法,否則跳過
electNow:系統(tǒng)自動觸發(fā)的system compaction,selectNow參數(shù)為false,如果周期性執(zhí)行MajorCompaction或者人工觸發(fā)的major compaction的合并,則selectNow為true
3.選擇線程池,如果selectNow為ture且需要compaction的文件大小大于throttlePoint值,則使用longCompactions線程,否則使用shortCompactions線程
longCompactions和shortCompactions線程池默認大小都是1,生產(chǎn)環(huán)境可以調(diào)整大一些
4.執(zhí)行線程

selectCompaction方法

private CompactionContext selectCompaction(final Region r, final Store s,int priority, CompactionRequest request, User user) throws IOException {//調(diào)用HStore.requestCompaction方法,獲取CompactionContext數(shù)據(jù)CompactionContext compaction = s.requestCompaction(priority, request, user);if (compaction == null) {if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +" because compaction request was cancelled");}return null;}//確認compcation不為空assert compaction.hasSelection();if (priority != Store.NO_PRIORITY) {compaction.getRequest().setPriority(priority);}return compaction;}

該方法主要作用 :獲取CompactionContext數(shù)據(jù)

requestCompaction方法

@Overridepublic CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,User user) throws IOException {// don't even select for compaction if writes are disabled//如果對應(yīng)的region不可以寫則返回nullif (!this.areWritesEnabled()) {return null;}// Before we do compaction, try to get rid of unneeded files to simplify things.removeUnneededFiles();//通過StoreEngine獲取CompactionContext,這里介紹使用的是DefaultStoreEnginefinal CompactionContext compaction = storeEngine.createCompaction();CompactionRequest request = null;//設(shè)置讀鎖this.lock.readLock().lock();try {//設(shè)置同步鎖synchronized (filesCompacting) {final Store thisStore = this;// First, see if coprocessor would want to override selection.//如果存在協(xié)處理器if (this.getCoprocessorHost() != null) {final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);boolean override = false;if (user == null) {override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,baseRequest);} else {try {override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {@Overridepublic Boolean run() throws Exception {return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,baseRequest);}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}if (override) {// Coprocessor is overriding normal file selection.compaction.forceSelect(new CompactionRequest(candidatesForCoproc));}}// Normal case - coprocessor is not overriding file selection.//正常情況if (!compaction.hasSelection()) {//是否為用戶Compactionboolean isUserCompaction = priority == Store.PRIORITY_USER;//判斷是否為高峰期boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&offPeakCompactionTracker.compareAndSet(false, true);try {//調(diào)用DefaultCompactionContext的select方法compaction.select(this.filesCompacting, isUserCompaction,mayUseOffPeak, forceMajor && filesCompacting.isEmpty());} catch (IOException e) {if (mayUseOffPeak) {offPeakCompactionTracker.set(false);}throw e;}assert compaction.hasSelection();//isOffPeak為true,這種壓實被提升為非高峰if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {// Compaction policy doesn't want to take advantage of off-peak.offPeakCompactionTracker.set(false);}}if (this.getCoprocessorHost() != null) {if (user == null) {this.getCoprocessorHost().postCompactSelection(this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);} else {try {user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {getCoprocessorHost().postCompactSelection(thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);return null;}});} catch (InterruptedException ie) {InterruptedIOException iioe = new InterruptedIOException();iioe.initCause(ie);throw iioe;}}}// Selected files; see if we have a compaction with some custom base request.if (baseRequest != null) {// Update the request with what the system thinks the request should be;// its up to the request if it wants to listen.compaction.forceSelect(baseRequest.combineWith(compaction.getRequest()));}// Finally, we have the resulting files list. Check if we have any files at all.request = compaction.getRequest();final Collection<StoreFile> selectedFiles = request.getFiles();if (selectedFiles.isEmpty()) {return null;}addToCompactingFiles(selectedFiles);// If we're enqueuing a major, clear the force flag.this.forceMajor = this.forceMajor && !request.isMajor();// Set common request properties.// Set priority, either override value supplied by caller or from store.request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());}} finally {this.lock.readLock().unlock();}LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"+ (request.isAllFiles() ? " (all files)" : ""));this.region.reportCompactionRequestStart(request.isMajor());return compaction;}

該方法主要作用:

1.先判斷region是否可以寫
2.提出不必要的文件
3.處理存在協(xié)處理器的數(shù)據(jù)
4.調(diào)用DefaultCompactionContext.select方法


DefaultCompactionContext.select方法最終調(diào)用SortedCompactionPolicy.selectCompaction 方法

參數(shù)說明:

參數(shù)說明默認值

hbase.regionserver.thread.compaction.small

RegionServer 小型壓縮線程計數(shù)1
hbase.regionserver.thread.compaction.largeRegionServer大型壓縮線程計數(shù)1

?

?

?

?

下一步就是在具體合并策略選取文件

查看:Hbase Compaction 源碼分析 - RatioBasedCompactionPolicy 策略

?

?

總結(jié)

以上是生活随笔為你收集整理的Hbase Compaction 源码分析 - CompactSplitThread 线程池选择的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。