日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > java >内容正文

java

Java8 Striped64 和 LongAdder

發(fā)布時間:2023/12/3 java 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java8 Striped64 和 LongAdder 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

轉(zhuǎn)載自??Java8 Striped64 和 LongAdder

數(shù)據(jù) STRIPING

根據(jù)維基百科的這段說明:

In computer data storage, data striping is the technique of segmenting logically sequential data, such as a file, so that consecutive segments are stored on different physical storage devices.

Striping is useful when a processing device requests data more quickly than a single storage device can provide it. By spreading segments across multiple devices which can be accessed concurrently, total data throughput is increased. It is also a useful method for balancing I/O load across an array of disks. Striping is used across disk drives in redundant array of independent disks (RAID) storage, network interface controllers, different computers in clustered file systems and grid-oriented storage, and RAM in some systems.

數(shù)據(jù) striping 就是把邏輯上連續(xù)的數(shù)據(jù)分為多個段,使這一序列的段存儲在不同的物理設(shè)備上。通過把段分散到多個設(shè)備上可以增加訪問并發(fā)性,從而提升總體的吞吐量。

Striped64

JDK 8 的?java.util.concurrent.atomic?下有一個包本地的類?Striped64?,它持有常見表示和機(jī)制用于類支持動態(tài) striping 到 64bit 值上。

設(shè)計(jì)思路

這個類維護(hù)一個延遲初始的、原子地更新值的表,加上額外的 “base” 字段。表的大小是 2 的冪。索引使用每線程的哈希碼來masked。這個的幾乎所有聲明都是包私有的,通過子類直接訪問。

表的條目是 Cell 類,一個填充過(通過?sun.misc.Contended?)的 AtomicLong 的變體,用于減少緩存競爭。填充對于多數(shù) Atomics 是過度殺傷的,因?yàn)樗鼈円话悴灰?guī)則地分布在內(nèi)存里,因此彼此間不會有太多沖突。但存在于數(shù)組的原子對象將傾向于彼此相鄰地放置,因此將通常共享緩存行(對性能有巨大的副作用),在沒有這個防備下。

部分地,因?yàn)镃ell相對比較大,我們避免創(chuàng)建它們直到需要時。當(dāng)沒有競爭時,所有的更新都作用到 base 字段。根據(jù)第一次競爭(更新 base 的 CAS 失敗),表被初始化為大小 2。表的大小根據(jù)更多的競爭加倍,直到大于或等于CPU數(shù)量的最小的 2 的冪。表的槽在它們需要之前保持空。

一個單獨(dú)的自旋鎖(“cellsBusy”)用于初始化和resize表,還有用新的Cell填充槽。不需要阻塞鎖,當(dāng)鎖不可得,線程嘗試其他槽(或 base)。在這些重試中,會增加競爭和減少本地性,這仍然好于其他選擇。

通過 ThreadLocalRandom 維護(hù)線程探針字段,作為每線程的哈希碼。我們讓它們?yōu)?0 來保持未初始化直到它們在槽 0 競爭。然后初始化它們?yōu)橥ǔ2粫ハ鄾_突的值。當(dāng)執(zhí)行更新操作時,競爭和/或表沖突通過失敗了的 CAS 來指示。根據(jù)沖突,如果表的大小小于容量,它的大小加倍,除非有些線程持有了鎖。如果一個哈希后的槽是空的,且鎖可得,創(chuàng)建新的Cell。否則,如果槽存在,重試CAS。重試通過 “重散列,double hashing” 來繼續(xù),使用一個次要的哈希算法(Marsaglia XorShift)來嘗試找到一個自由槽位。

表的大小是有上限的,因?yàn)?#xff0c;當(dāng)線程數(shù)多于CPU數(shù)時,假如每個線程綁定到一個CPU上,存在一個完美的哈希函數(shù)映射線程到槽上,消除了沖突。當(dāng)我們到達(dá)容量,我們隨機(jī)改變碰撞線程的哈希碼搜索這個映射。因?yàn)樗阉魇请S機(jī)的,沖突只能通過CAS失敗來知道,收斂convergence 是慢的,因?yàn)榫€程通常不會一直綁定到CPU上,可能根本不會發(fā)生。然而,盡管有這些限制,在這些案例下觀察到的競爭頻率顯著地低。

當(dāng)哈希到特定 Cell 的線程終止后,Cell 可能變?yōu)榭臻e的,表加倍后導(dǎo)致沒有線程哈希到擴(kuò)展的 Cell 也會出現(xiàn)這種情況。我們不嘗試去檢測或移除這些 Cell,在實(shí)例長期運(yùn)行的假設(shè)下,觀察到的競爭水平將重現(xiàn),所以 Cell 將最終被再次需要。對于短期存活的實(shí)例,這沒關(guān)系。

設(shè)計(jì)思路小結(jié)

  • striping和緩存行填充:通過把類數(shù)據(jù) striping 為 64bit 的片段,使數(shù)據(jù)成為緩存行友好的,減少CAS競爭。
  • 分解表示:對于一個數(shù)字 5,可以分解為一序列數(shù)的和:2 + 3,這個數(shù)字加 1 也等價于它的分解序列中的任一 數(shù)字加 1:5 + 1 = 2 + (3 + 1)。
  • 通過把分解序列存放在表里面,表的條目都是填充后的 Cell;限制表的大小為 2 的冪,則可以用掩碼來實(shí)現(xiàn)索引;同時把表的大小限制為大于等于CPU數(shù)量的最小的 2 的冪。
  • 當(dāng)表的條目上出現(xiàn)競爭時,在到達(dá)容量前表擴(kuò)容一倍,通過增加條目來減少競爭。

CELL 類

Cell?類是?Striped64?的靜態(tài)內(nèi)部類。通過注解?@sun.misc.Contended?來自動實(shí)現(xiàn)緩存行填充,讓Java編譯器和JRE運(yùn)行時來決定如何填充。本質(zhì)上是一個填充了的、提供了CAS更新的volatile變量。

@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}} }

STRIPED64

Striped64 通過一個 Cell 數(shù)組維持了一序列分解數(shù)的表示,通過 base 字段維持?jǐn)?shù)的初始值,通過 cellsBusy 字段來控制 resing 和/或 創(chuàng)建Cell。它還提供了對數(shù)進(jìn)行累加的機(jī)制。

abstract class Striped64 extends Number {static final int NCPU = Runtime.getRuntime().availableProcessors();// 存放 Cell 的表。當(dāng)不為空時大小是 2 的冪。transient volatile Cell[] cells;// base 值,在沒有競爭時使用,也作為表初始化競爭時的一個后備。transient volatile long base;// 自旋鎖,在 resizing 和/或 創(chuàng)建Cell時使用。transient volatile int cellsBusy; }

累加機(jī)制 longAccumulate

設(shè)計(jì)思路里針對機(jī)制的實(shí)現(xiàn),核心邏輯。該方法處理涉及初始化、resing、創(chuàng)建新cell、和/或競爭的更新。

邏輯如下:

  • if 表已初始化

    • if 映射到的槽是空的,加鎖后再次判斷,如果仍然是空的,初始化cell并關(guān)聯(lián)到槽。
    • else if (槽不為空)在槽上之前的CAS已經(jīng)失敗,重試。
    • else if (槽不為空、且之前的CAS沒失敗,)在此槽的cell上嘗試更新
    • else if 表已達(dá)到容量上限或被擴(kuò)容了,重試。
    • else if 如果不存在沖突,則設(shè)置為存在沖突,重試。
    • else if 如果成功獲取到鎖,則擴(kuò)容。
    • else 重散列,嘗試其他槽。
  • else if 鎖空閑且獲取鎖成功,初始化表

  • else if 回退 base 上更新且成功則退出
  • else 繼續(xù)
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;if ((h = getProbe()) == 0) {// 未初始化的ThreadLocalRandom.current(); // 強(qiáng)制初始化h = getProbe();wasUncontended = true;}// 最后的槽不為空則 true,也用于控制擴(kuò)容,false重試。boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;if ((as = cells) != null && (n = as.length) > 0) {// 表已經(jīng)初始化if ((a = as[(n - 1) & h]) == null) {// 線程所映射到的槽是空的。if (cellsBusy == 0) { // 嘗試關(guān)聯(lián)新的Cell// 鎖未被使用,樂觀地創(chuàng)建并初始化cell。Cell r = new Cell(x);if (cellsBusy == 0 && casCellsBusy()) {// 鎖仍然是空閑的、且成功獲取到鎖boolean created = false;try { // 在持有鎖時再次檢查槽是否空閑。Cell[] rs; int m, j;if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {// 所映射的槽仍為空rs[j] = r; // 關(guān)聯(lián) cell 到槽created = true;}} finally {cellsBusy = 0; // 釋放鎖}if (created)break; // 成功創(chuàng)建cell并關(guān)聯(lián)到槽,退出continue; // 槽現(xiàn)在不為空了}}// 鎖被占用了,重試collide = false;}// 槽被占用了else if (!wasUncontended) // 已經(jīng)知道 CAS 失敗wasUncontended = true; // 在重散列后繼續(xù)// 在當(dāng)前槽的cell上嘗試更新else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;// 表大小達(dá)到上限或擴(kuò)容了;// 表達(dá)到上限后就不會再嘗試下面if的擴(kuò)容了,只會重散列,嘗試其他槽else if (n >= NCPU || cells != as)collide = false; // At max size or stale// 如果不存在沖突,則設(shè)置為存在沖突else if (!collide)collide = true;// 有競爭,需要擴(kuò)容else if (cellsBusy == 0 && casCellsBusy()) {// 鎖空閑且成功獲取到鎖try {if (cells == as) { // 距上一次檢查后表沒有改變,擴(kuò)容:加倍Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0; // 釋放鎖}collide = false;continue; // 在擴(kuò)容后的表上重試}// 沒法獲取鎖,重散列,嘗試其他槽h = advanceProbe(h);}else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加鎖的情況下初始化表boolean init = false;try { // Initialize tableif (cells == as) {Cell[] rs = new Cell[2];rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {cellsBusy = 0; // 釋放鎖}if (init)break; // 成功初始化,已更新,跳出循環(huán)}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))// 表未被初始化,可能正在初始化,回退使用 base。break; // 回退到使用 base} }

LongAdder

LongAdder 繼承自 Striped64,它的方法只針對簡單的情況:cell存在且更新無競爭,其余情況都通過 Striped64 的longAccumulate方法來完成。

public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) {// cells 不為空 或在 base 上cas失敗。也即出現(xiàn)了競爭。boolean uncontended = true;//if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))// 如果所映射的槽不為空,且成功更新則返回,否則進(jìn)入復(fù)雜處理流程。longAccumulate(x, null, uncontended);} }// 獲取當(dāng)前的和。base值加上每個cell的值。 public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum; }


總結(jié)

以上是生活随笔為你收集整理的Java8 Striped64 和 LongAdder的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。