日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ad09只在一定范围内查找相似对象_kafka日志段中的二分查找

發(fā)布時(shí)間:2024/4/13 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ad09只在一定范围内查找相似对象_kafka日志段中的二分查找 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

二分查找

Kafka 中直接接觸索引或索引文件的場(chǎng)景可能不是很多。索引是一個(gè)很神秘的組件,Kafka 官方文檔也沒有怎么提過它。索引這個(gè)組件的源碼還有一個(gè)亮點(diǎn),那就是它應(yīng)用了耳熟能詳?shù)亩植檎宜惴▉砜焖俣ㄎ凰饕?xiàng)。而且社區(qū)還針對(duì) Kafka 自身的特點(diǎn)對(duì)其進(jìn)行了改良。

1. 索引類圖及源文件組織架構(gòu)

在 Kafka 源碼中,跟索引相關(guān)的源碼文件有 5 個(gè),它們都位于 core 包的 /src/main/scala/kafka/log 路徑下。

  • AbstractIndex.scala:它定義了最頂層的抽象類,這個(gè)類封裝了所有索引類型的公共操作。
  • LazyIndex.scala:它定義了 AbstractIndex 上的一個(gè)包裝類,實(shí)現(xiàn)索引項(xiàng)延遲加載。這個(gè)類主要是為了提高性能。
  • OffsetIndex.scala:定義位移索引,保存“< 位移值,文件磁盤物理位置 >”對(duì)。
  • TimeIndex.scala:定義時(shí)間戳索引,保存“< 時(shí)間戳,位移值 >”對(duì)。
  • TransactionIndex.scala:定義事務(wù)索引,為已中止事務(wù)(Aborted Transcation)保存重要的元數(shù)據(jù)信息。只有啟用 Kafka 事務(wù)后,這個(gè)索引才有可能出現(xiàn)。

這些類的繼承關(guān)系如下圖所示:

其中,OffsetIndex、TimeIndex 和 TransactionIndex 都繼承了 AbstractIndex 類,而上層的 LazyIndex 僅僅是包裝了一個(gè) AbstractIndex 的實(shí)現(xiàn)類,用于延遲加載。

2. AbstractIndex 代碼結(jié)構(gòu)

abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable { ...... }

AbstractIndex 定義了 4 個(gè)屬性字段。由于是一個(gè)抽象基類,它的所有子類自動(dòng)地繼承了這 4 個(gè)字段。也就是說,Kafka 所有類型的索引對(duì)象都定義了這些屬性。

  • 索引文件(file)。每個(gè)索引對(duì)象在磁盤上都對(duì)應(yīng)了一個(gè)索引文件。這個(gè)字段是 var 型,說明它是可以被修改的。難道索引對(duì)象還能動(dòng)態(tài)更換底層的索引文件嗎?自 1.1.0 版本之后,Kafka 允許遷移底層的日志路徑,所以,索引文件自然要是可以更換的。
  • 起始位移值(baseOffset)。索引對(duì)象對(duì)應(yīng)日志段對(duì)象的起始位移值。舉個(gè)例子,如果你查看 Kafka 日志路徑的話,就會(huì)發(fā)現(xiàn),日志文件和索引文件都是成組出現(xiàn)的。比如說,如果日志文件是 00000000000000000123.log,正常情況下,一定還有一組索引文件 00000000000000000123.index、00000000000000000123.timeindex 等。這里的“123”就是這組文件的起始位移值,也就是 baseOffset 值。
  • 索引文件最大字節(jié)數(shù)(maxIndexSize)。它控制索引文件的最大長(zhǎng)度。Kafka 源碼傳入該參數(shù)的值是 Broker 端參數(shù) segment.index.bytes 的值,即 10MB。這就是在默認(rèn)情況下,所有 Kafka 索引文件大小都是 10MB 的原因。
  • 索引文件打開方式(writable)。“True”表示以“讀寫”方式打開,“False”表示以“只讀”方式打開。
  • AbstractIndex 是抽象的索引對(duì)象類。可以說,它是承載索引項(xiàng)的容器,而每個(gè)繼承它的子類負(fù)責(zé)定義具體的索引項(xiàng)結(jié)構(gòu)。

    比如,OffsetIndex 的索引項(xiàng)是 < 位移值,物理磁盤位置 > 對(duì),TimeIndex 的索引項(xiàng)是 < 時(shí)間戳,位移值 > 對(duì)。基于這樣的設(shè)計(jì)理念,AbstractIndex 類中定義了一個(gè)抽象方法 entrySize 來表示不同索引項(xiàng)的大小,如下所示:

    protected def entrySize: Int

    子類實(shí)現(xiàn)該方法時(shí)需要給定自己索引項(xiàng)的大小,對(duì)于 OffsetIndex 而言,該值就是 8;對(duì)于 TimeIndex 而言,該值是 12。

    // OffsetIndex override def entrySize = 8 // TimeIndex override def entrySize = 12

    8和12具體什么含義呢?

    在 OffsetIndex 中,位移值用 4 個(gè)字節(jié)來表示,物理磁盤位置也用 4 個(gè)字節(jié)來表示,所以總共是 8 個(gè)字節(jié)。位移值不是長(zhǎng)整型,應(yīng)該是 8 個(gè)字節(jié)才對(duì)。上面提到 AbstractIndex 已經(jīng)保存了 baseOffset 了,這里的位移值,實(shí)際上是相對(duì)于 baseOffset 的相對(duì)位移值,即真實(shí)位移值減去 baseOffset 的值,使用相對(duì)位移值能夠有效地節(jié)省磁盤空間。而 Broker 端參數(shù) log.segment.bytes 是整型,這說明,Kafka 中每個(gè)日志段文件的大小不會(huì)超過 2^32,即 4GB,這就說明同一個(gè)日志段文件上的位移值減去 baseOffset 的差值一定在整數(shù)范圍內(nèi)。因此,源碼只需要 4 個(gè)字節(jié)保存就行了。

    同理,TimeIndex 中的時(shí)間戳類型是長(zhǎng)整型,占用 8 個(gè)字節(jié),位移依然使用相對(duì)位移值,占用 4 個(gè)字節(jié),因此總共需要 12 個(gè)字節(jié)。

    Kafka 中的索引底層的實(shí)現(xiàn)原理是 Java 中的 MappedByteBuffer。使用內(nèi)存映射文件的主要優(yōu)勢(shì)在于,它有很高的 I/O 性能,特別是對(duì)于索引這樣的小文件來說,由于文件內(nèi)存被直接映射到一段虛擬內(nèi)存上,訪問內(nèi)存映射文件的速度要快于普通的讀寫文件速度。

    在 AbstractIndex 中,這個(gè) MappedByteBuffer 就是名為 mmap 的變量。看下源碼:

    @volatile protected var mmap: MappedByteBuffer = { // 第1步:創(chuàng)建索引文件 val newlyCreated = file.createNewFile() // 第2步:以writable指定的方式(讀寫方式或只讀方式)打開索引文件 val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { if(newlyCreated) { if(maxIndexSize < entrySize) // 預(yù)設(shè)的索引文件大小不能太小,如果連一個(gè)索引項(xiàng)都保存不了,直接拋出異常 throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) // 第3步:設(shè)置索引文件長(zhǎng)度,roundDownToExactMultiple計(jì)算的是不超過maxIndexSize的最大整數(shù)倍entrySize // 比如maxIndexSize=1234567,entrySize=8,那么調(diào)整后的文件長(zhǎng)度為1234560 raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) } // 第4步:更新索引長(zhǎng)度字段_length _length = raf.length() // 第5步:創(chuàng)建MappedByteBuffer對(duì)象 val idx = { if (writable) raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length) else raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length) } /* set the position in the index for the next entry */ // 第6步:如果是新創(chuàng)建的索引文件,將MappedByteBuffer對(duì)象的當(dāng)前位置置成0 // 如果索引文件已存在,將MappedByteBuffer對(duì)象的當(dāng)前位置設(shè)置成最后一個(gè)索引項(xiàng)所在的位置 if(newlyCreated) idx.position(0) else idx.position(roundDownToExactMultiple(idx.limit(), entrySize)) // 第7步:返回創(chuàng)建的MappedByteBuffer對(duì)象 idx } finally { CoreUtils.swallow(raf.close(), AbstractIndex) // 關(guān)閉打開索引文件句柄 } }

    這些代碼最主要的作用就是創(chuàng)建 mmap 對(duì)象,AbstractIndex 其他大部分的操作都是和 mmap 相關(guān)。

    比如:

    // 如果我們要計(jì)算索引對(duì)象中當(dāng)前有多少個(gè)索引項(xiàng),只需要執(zhí)行下列計(jì)算:protected var _entries: Int = mmap.position() / entrySize // 如果我們要計(jì)算索引文件最多能容納多少個(gè)索引項(xiàng),只要定義下面的變量就行了:private[this] var _maxEntries: Int = mmap.limit() / entrySize // 再進(jìn)一步,有了這兩個(gè)變量,我們就能夠很容易地編寫一個(gè)方法,來判斷當(dāng)前索引文件是否已經(jīng)寫滿:def isFull: Boolean = _entries >= _maxEntries

    3. 寫入索引項(xiàng)

    下面這段代碼是 OffsetIndex 的 append 方法,用于向索引文件中寫入新索引項(xiàng)。

    def append(offset: Long, position: Int): Unit = { inLock(lock) { // 第1步:判斷索引文件未寫滿 require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") // 第2步:必須滿足以下條件之一才允許寫入索引項(xiàng): // 條件1:當(dāng)前索引文件為空 // 條件2:要寫入的位移大于當(dāng)前所有已寫入的索引項(xiàng)的位移——Kafka規(guī)定索引項(xiàng)中的位移值必須是單調(diào)增加的 if (_entries == 0 || offset > _lastOffset) { trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}") mmap.putInt(relativeOffset(offset)) // 第3步A:向mmap中寫入相對(duì)位移值 mmap.putInt(position) // 第3步B:向mmap中寫入物理位置信息 // 第4步:更新其他元數(shù)據(jù)統(tǒng)計(jì)信息,如當(dāng)前索引項(xiàng)計(jì)數(shù)器_entries和當(dāng)前索引項(xiàng)最新位移值_lastOffset _entries += 1 _lastOffset = offset // 第5步:執(zhí)行校驗(yàn)。寫入的索引項(xiàng)格式必須符合要求,即索引項(xiàng)個(gè)數(shù)*單個(gè)索引項(xiàng)占用字節(jié)數(shù)匹配當(dāng)前文件物理大小,否則說明文件已損壞 require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { // 如果第2步中兩個(gè)條件都不滿足,不能執(zhí)行寫入索引項(xiàng)操作,拋出異常 throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" + s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.") } } }

    4. 查找索引項(xiàng)

    索引項(xiàng)的寫入邏輯并不復(fù)雜,難點(diǎn)在于如何查找索引項(xiàng)。AbstractIndex 定義了抽象方法 parseEntry 用于查找給定的索引項(xiàng),如下所示:

    protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

    這里的 “n” 表示要查找給定 ByteBuffer 中保存的第 n 個(gè)索引項(xiàng), IndexEntry 是源碼定義的一個(gè)接口,里面有兩個(gè)方法:indexKey 和 indexValue,分別返回不同類型索引的對(duì)。

    OffsetIndex 實(shí)現(xiàn) parseEntry 的邏輯如下:

    override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = { OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) }

    OffsetPosition 是 IndexEntry 的實(shí)現(xiàn)類,Key 就是之前說的位移值,而 Value 就是物理磁盤位置值。所以,這里你能看到代碼調(diào)用了 relativeOffset(buffer, n) + baseOffset 計(jì)算出絕對(duì)位移值,之后調(diào)用 physical(buffer, n) 計(jì)算物理磁盤位置,最后將它們封裝到一起作為一個(gè)獨(dú)立的索引項(xiàng)返回。

    有了 parseEntry 方法,我們就能夠根據(jù)給定的 n 來查找索引項(xiàng)了。但是,這里還有個(gè)問題需要解決,那就是,我們?nèi)绾未_定要找的索引項(xiàng)在第 n 個(gè)槽中呢?也就是如何從一組已排序的數(shù)中快速定位符合條件的那個(gè)數(shù),二分查找登場(chǎng)。

    5. 二分查找算法

    到目前為止,從已排序數(shù)組中尋找某個(gè)數(shù)字最快速的算法就是二分查找了,它能做到 O(lgN) 的時(shí)間復(fù)雜度。Kafka 的索引組件就應(yīng)用了二分查找算法。

    原版的實(shí)現(xiàn)代碼:

    private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 第1步:如果當(dāng)前索引為空,直接返回對(duì) if(_entries == 0) return (-1, -1) // 第2步:要查找的位移值不能小于當(dāng)前最小位移值 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // binary search for the entry // 第3步:執(zhí)行二分查找算法 var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1)

    這段代碼的核心是,第 3 步的二分查找算法。常刷算法題的朋友,再熟悉不過了。

    6. 改進(jìn)版二分查找算法

    大多數(shù)操作系統(tǒng)使用頁緩存來實(shí)現(xiàn)內(nèi)存映射,而目前幾乎所有的操作系統(tǒng)都使用 LRU(Least Recently Used)或類似于 LRU 的機(jī)制來管理頁緩存。Kafka 寫入索引文件的方式是在文件末尾追加寫入,而幾乎所有的索引查詢都集中在索引的尾部。這么來看的話,LRU 機(jī)制是非常適合 Kafka 的索引訪問場(chǎng)景的。

    但,這里有個(gè)問題是,當(dāng) Kafka 在查詢索引的時(shí)候,原版的二分查找算法并沒有考慮到緩存的問題,因此很可能會(huì)導(dǎo)致一些不必要的缺頁中斷(Page Fault)。此時(shí),Kafka 線程會(huì)被阻塞,等待對(duì)應(yīng)的索引項(xiàng)從物理磁盤中讀出并放入到頁緩存中。

    下面舉個(gè)例子來說明一下這個(gè)情況。假設(shè) Kafka 的某個(gè)索引占用了操作系統(tǒng)頁緩存 13 個(gè)頁(Page),如果待查找的位移值位于最后一個(gè)頁上,也就是 Page 12,那么標(biāo)準(zhǔn)的二分查找算法會(huì)依次讀取頁號(hào) 0、6、9、11 和 12,具體的流程不過多敘述。

    接下來是重點(diǎn):

    通常來說,一個(gè)頁上保存了成百上千的索引項(xiàng)數(shù)據(jù)。隨著索引文件不斷被寫入,Page 12 不斷地被填充新的索引項(xiàng)。如果此時(shí)索引查詢方都來自 ISR 副本或 Lag 很小的消費(fèi)者,那么這些查詢大多集中在對(duì) Page 12 的查詢,因此,Page 0、6、9、11、12 一定經(jīng)常性地被源碼訪問。也就是說,這些頁一定保存在頁緩存上。

    后面當(dāng)新的索引項(xiàng)填滿了 Page 12,頁緩存就會(huì)申請(qǐng)一個(gè)新的 Page 來保存索引項(xiàng),即 Page 13。現(xiàn)在,最新索引項(xiàng)保存在 Page 13 中。如果要查找最新索引項(xiàng),原版二分查找算法將會(huì)依次訪問 Page 0、7、10、12 和 13。此時(shí),問題來了:Page 7 和 10 已經(jīng)很久沒有被訪問過了,它們大概率不在頁緩存中,因此,一旦索引開始征用 Page 13,就會(huì)發(fā)生 Page Fault,等待那些冷頁數(shù)據(jù)從磁盤中加載到頁緩存。根據(jù)資料查詢,這種加載過程可能長(zhǎng)達(dá) 1 秒。顯然,這是一個(gè)普遍的問題,即每當(dāng)索引文件占用 Page 數(shù)發(fā)生變化時(shí),就會(huì)強(qiáng)行變更二分查找的搜索路徑,從而出現(xiàn)不在頁緩存的冷數(shù)據(jù)必須要加載到頁緩存的情形,而這種加載過程是非常耗時(shí)的。

    基于這個(gè)問題,社區(qū)提出了改進(jìn)版的二分查找策略,也就是緩存友好的搜索算法。總體的思路是,代碼將所有索引項(xiàng)分成兩個(gè)部分:熱區(qū)(Warm Area)和冷區(qū)(Cold Area),然后分別在這兩個(gè)區(qū)域內(nèi)執(zhí)行二分查找算法,如下圖所示:

    同樣是查詢最熱的那部分?jǐn)?shù)據(jù),一旦索引占用了更多的 Page,要遍歷的 Page 組合就會(huì)發(fā)生變化。這是導(dǎo)致性能下降的主要原因。這個(gè)改進(jìn)版算法的最大好處在于,查詢最熱那部分?jǐn)?shù)據(jù)所遍歷的 Page 永遠(yuǎn)是固定的,因此大概率在頁緩存中,從而避免無意義的 Page Fault。

    看到這個(gè)設(shè)計(jì)時(shí),我真的感覺到算法的精妙以及commiter的NB。

    看下實(shí)際的代碼:

    private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 第1步:如果索引為空,直接返回對(duì) if(_entries == 0) return (-1, -1) // 封裝原版的二分查找算法 def binarySearch(begin: Int, end: Int) : (Int, Int) = { // binary search for the entry var lo = begin var hi = end while(lo < hi) { val mid = (lo + hi + 1) >>> 1 val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) } // 第3步:確認(rèn)熱區(qū)首個(gè)索引項(xiàng)位于哪個(gè)槽。_warmEntries就是所謂的分割線,目前固定為8192字節(jié)處 // 如果是OffsetIndex,_warmEntries = 8192 / 8 = 1024,即第1024個(gè)槽 // 如果是TimeIndex,_warmEntries = 8192 / 12 = 682,即第682個(gè)槽 val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries) // 第4步:判斷target位移值在熱區(qū)還是冷區(qū) if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) { return binarySearch(firstHotEntry, _entries - 1) // 如果在熱區(qū),搜索熱區(qū) } // 第5步:確保target位移值不能小于當(dāng)前最小位移值 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // 第6步:如果在冷區(qū),搜索冷區(qū) binarySearch(0, firstHotEntry)

    最后來張兩個(gè)算法的總結(jié):

    7. 空間與時(shí)間的互換

    到二分查找還沒完,日志段有個(gè)參數(shù) indexIntervalBytes, 可以理解為插了多少條消息之后再建一個(gè)索引,由此看出kafka的索引其實(shí)是稀疏索引,這樣可以避免索引文件占用過多的內(nèi)存,從而可以在內(nèi)存中保存更多的索引。對(duì)應(yīng)Broker端參數(shù)就是 log.index.interval.bytes 值,默認(rèn)4kb。

    實(shí)際的通過索引查找消息的過程是通過offset找到索引所在的文件,然后通過二分法找到離目標(biāo)最近的索引,再順序遍歷消息文件找到目標(biāo)文件。復(fù)雜度為 O(log2n)+O(m), n是索引文件里索引的個(gè)數(shù),m為稀疏程度。

    這就是時(shí)間和空間的互換,數(shù)據(jù)結(jié)構(gòu)和算法的平衡。

    總結(jié)

    以上是生活随笔為你收集整理的ad09只在一定范围内查找相似对象_kafka日志段中的二分查找的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。