日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

phaser java_死磕 java同步系列之Phaser源码解析

發(fā)布時(shí)間:2024/7/23 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 phaser java_死磕 java同步系列之Phaser源码解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

問題

(1)Phaser是什么?

(2)Phaser具有哪些特性?

(3)Phaser相對(duì)于CyclicBarrier和CountDownLatch的優(yōu)勢?

簡介

Phaser,翻譯為階段,它適用于這樣一種場景,一個(gè)大任務(wù)可以分為多個(gè)階段完成,且每個(gè)階段的任務(wù)可以多個(gè)線程并發(fā)執(zhí)行,但是必須上一個(gè)階段的任務(wù)都完成了才可以執(zhí)行下一個(gè)階段的任務(wù)。

這種場景雖然使用CyclicBarrier或者CountryDownLatch也可以實(shí)現(xiàn),但是要復(fù)雜的多。首先,具體需要多少個(gè)階段是可能會(huì)變的,其次,每個(gè)階段的任務(wù)數(shù)也可能會(huì)變的。相比于CyclicBarrier和CountDownLatch,Phaser更加靈活更加方便。

使用方法

下面我們看一個(gè)最簡單的使用案例:

public class PhaserTest {

public static final int PARTIES = 3;

public static final int PHASES = 4;

public static void main(String[] args) {

Phaser phaser = new Phaser(PARTIES) {

@Override

protected boolean onAdvance(int phase, int registeredParties) {

// 【本篇文章由公眾號(hào)“彤哥讀源碼”原創(chuàng),請(qǐng)支持原創(chuàng),謝謝!】

System.out.println("=======phase: " + phase + " finished=============");

return super.onAdvance(phase, registeredParties);

}

};

for (int i = 0; i < PARTIES; i++) {

new Thread(()->{

for (int j = 0; j < PHASES; j++) {

System.out.println(String.format("%s: phase: %d", Thread.currentThread().getName(), j));

phaser.arriveAndAwaitAdvance();

}

}, "Thread " + i).start();

}

}

}

這里我們定義一個(gè)需要4個(gè)階段完成的大任務(wù),每個(gè)階段需要3個(gè)小任務(wù),針對(duì)這些小任務(wù),我們分別起3個(gè)線程來執(zhí)行這些小任務(wù),查看輸出結(jié)果為:

Thread 0: phase: 0

Thread 2: phase: 0

Thread 1: phase: 0

=======phase: 0 finished=============

Thread 2: phase: 1

Thread 0: phase: 1

Thread 1: phase: 1

=======phase: 1 finished=============

Thread 1: phase: 2

Thread 0: phase: 2

Thread 2: phase: 2

=======phase: 2 finished=============

Thread 0: phase: 3

Thread 2: phase: 3

Thread 1: phase: 3

=======phase: 3 finished=============

可以看到,每個(gè)階段都是三個(gè)線程都完成了才進(jìn)入下一個(gè)階段。這是怎么實(shí)現(xiàn)的呢,讓我們一起來學(xué)習(xí)吧。

原理猜測

根據(jù)我們前面學(xué)習(xí)AQS的原理,大概猜測一下Phaser的實(shí)現(xiàn)原理。

首先,需要存儲(chǔ)當(dāng)前階段phase、當(dāng)前階段的任務(wù)數(shù)(參與者)parties、未完成參與者的數(shù)量,這三個(gè)變量我們可以放在一個(gè)變量state中存儲(chǔ)。

其次,需要一個(gè)隊(duì)列存儲(chǔ)先完成的參與者,當(dāng)最后一個(gè)參與者完成任務(wù)時(shí),需要喚醒隊(duì)列中的參與者。

嗯,差不多就是這樣子。

結(jié)合上面的案例帶入:

初始時(shí)當(dāng)前階段為0,參與者數(shù)為3個(gè),未完成參與者數(shù)為3;

第一個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();時(shí)進(jìn)入隊(duì)列;

第二個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();時(shí)進(jìn)入隊(duì)列;

第三個(gè)線程執(zhí)行到phaser.arriveAndAwaitAdvance();時(shí)先執(zhí)行這個(gè)階段的總結(jié)onAdvance(),再喚醒前面兩個(gè)線程繼續(xù)執(zhí)行下一個(gè)階段的任務(wù)。

嗯,整體能說得通,至于是不是這樣呢,讓我們一起來看源碼吧。

源碼分析

主要內(nèi)部類

static final class QNode implements ForkJoinPool.ManagedBlocker {

final Phaser phaser;

final int phase;

final boolean interruptible;

final boolean timed;

boolean wasInterrupted;

long nanos;

final long deadline;

volatile Thread thread; // nulled to cancel wait

QNode next;

QNode(Phaser phaser, int phase, boolean interruptible,

boolean timed, long nanos) {

this.phaser = phaser;

this.phase = phase;

this.interruptible = interruptible;

this.nanos = nanos;

this.timed = timed;

this.deadline = timed ? System.nanoTime() + nanos : 0L;

thread = Thread.currentThread();

}

}

先完成的參與者放入隊(duì)列中的節(jié)點(diǎn),這里我們只需要關(guān)注thread和next兩個(gè)屬性即可,很明顯這是一個(gè)單鏈表,存儲(chǔ)著入隊(duì)的線程。

主要屬性

// 狀態(tài)變量,用于存儲(chǔ)當(dāng)前階段phase、參與者數(shù)parties、未完成的參與者數(shù)unarrived_count

private volatile long state;

// 最多可以有多少個(gè)參與者,即每個(gè)階段最多有多少個(gè)任務(wù)

private static final int MAX_PARTIES = 0xffff;

// 最多可以有多少階段

private static final int MAX_PHASE = Integer.MAX_VALUE;

// 參與者數(shù)量的偏移量

private static final int PARTIES_SHIFT = 16;

// 當(dāng)前階段的偏移量

private static final int PHASE_SHIFT = 32;

// 未完成的參與者數(shù)的掩碼,低16位

private static final int UNARRIVED_MASK = 0xffff; // to mask ints

// 參與者數(shù),中間16位

private static final long PARTIES_MASK = 0xffff0000L; // to mask longs

// counts的掩碼,counts等于參與者數(shù)和未完成的參與者數(shù)的'|'操作

private static final long COUNTS_MASK = 0xffffffffL;

private static final long TERMINATION_BIT = 1L << 63;

// 一次一個(gè)參與者完成

private static final int ONE_ARRIVAL = 1;

// 增加減少參與者時(shí)使用

private static final int ONE_PARTY = 1 << PARTIES_SHIFT;

// 減少參與者時(shí)使用

private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;

// 沒有參與者時(shí)使用

private static final int EMPTY = 1;

// 用于求未完成參與者數(shù)量

private static int unarrivedOf(long s) {

int counts = (int)s;

return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);

}

// 用于求參與者數(shù)量(中間16位),注意int的位置

private static int partiesOf(long s) {

return (int)s >>> PARTIES_SHIFT;

}

// 用于求階段數(shù)(高32位),注意int的位置

private static int phaseOf(long s) {

return (int)(s >>> PHASE_SHIFT);

}

// 已完成參與者的數(shù)量

private static int arrivedOf(long s) {

int counts = (int)s; // 低32位

return (counts == EMPTY) ? 0 :

(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);

}

// 用于存儲(chǔ)已完成參與者所在的線程,根據(jù)當(dāng)前階段的奇偶性選擇不同的隊(duì)列

private final AtomicReference evenQ;

private final AtomicReference oddQ;

主要屬性為state和evenQ及oddQ:

(1)state,狀態(tài)變量,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量【本篇文章由公眾號(hào)“彤哥讀源碼”原創(chuàng),請(qǐng)支持原創(chuàng),謝謝!】;

(2)evenQ和oddQ,已完成的參與者存儲(chǔ)的隊(duì)列,當(dāng)最后一個(gè)參與者完成任務(wù)后喚醒隊(duì)列中的參與者繼續(xù)執(zhí)行下一個(gè)階段的任務(wù),或者結(jié)束任務(wù)。

構(gòu)造方法

public Phaser() {

this(null, 0);

}

public Phaser(int parties) {

this(null, parties);

}

public Phaser(Phaser parent) {

this(parent, 0);

}

public Phaser(Phaser parent, int parties) {

if (parties >>> PARTIES_SHIFT != 0)

throw new IllegalArgumentException("Illegal number of parties");

int phase = 0;

this.parent = parent;

if (parent != null) {

final Phaser root = parent.root;

this.root = root;

this.evenQ = root.evenQ;

this.oddQ = root.oddQ;

if (parties != 0)

phase = parent.doRegister(1);

}

else {

this.root = this;

this.evenQ = new AtomicReference();

this.oddQ = new AtomicReference();

}

// 狀態(tài)變量state的存儲(chǔ)分為三段

this.state = (parties == 0) ? (long)EMPTY :

((long)phase << PHASE_SHIFT) |

((long)parties << PARTIES_SHIFT) |

((long)parties);

}

構(gòu)造函數(shù)中還有一個(gè)parent和root,這是用來構(gòu)造多層級(jí)階段的,不在本文的討論范圍之內(nèi),忽略之。

重點(diǎn)還是看state的賦值方式,高32位存儲(chǔ)當(dāng)前階段phase,中間16位存儲(chǔ)參與者的數(shù)量,低16位存儲(chǔ)未完成參與者的數(shù)量。

下面我們一起來看看幾個(gè)主要方法的源碼:

register()方法

注冊(cè)一個(gè)參與者,如果調(diào)用該方法時(shí),onAdvance()方法正在執(zhí)行,則該方法等待其執(zhí)行完畢。

public int register() {

return doRegister(1);

}

private int doRegister(int registrations) {

// state應(yīng)該加的值,注意這里是相當(dāng)于同時(shí)增加parties和unarrived

long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;

final Phaser parent = this.parent;

int phase;

for (;;) {

// state的值

long s = (parent == null) ? state : reconcileState();

// state的低32位,也就是parties和unarrived的值

int counts = (int)s;

// parties的值

int parties = counts >>> PARTIES_SHIFT;

// unarrived的值

int unarrived = counts & UNARRIVED_MASK;

// 檢查是否溢出

if (registrations > MAX_PARTIES - parties)

throw new IllegalStateException(badRegister(s));

// 當(dāng)前階段phase

phase = (int)(s >>> PHASE_SHIFT);

if (phase < 0)

break;

// 不是第一個(gè)參與者

if (counts != EMPTY) { // not 1st registration

if (parent == null || reconcileState() == s) {

// unarrived等于0說明當(dāng)前階段正在執(zhí)行onAdvance()方法,等待其執(zhí)行完畢

if (unarrived == 0) // wait out advance

root.internalAwaitAdvance(phase, null);

// 否則就修改state的值,增加adjust,如果成功就跳出循環(huán)

else if (UNSAFE.compareAndSwapLong(this, stateOffset,

s, s + adjust))

break;

}

}

// 是第一個(gè)參與者

else if (parent == null) { // 1st root registration

// 計(jì)算state的值

long next = ((long)phase << PHASE_SHIFT) | adjust;

// 修改state的值,如果成功就跳出循環(huán)

if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))

break;

}

else {

// 多層級(jí)階段的處理方式

synchronized (this) { // 1st sub registration

if (state == s) { // recheck under lock

phase = parent.doRegister(1);

if (phase < 0)

break;

// finish registration whenever parent registration

// succeeded, even when racing with termination,

// since these are part of the same "transaction".

while (!UNSAFE.compareAndSwapLong

(this, stateOffset, s,

((long)phase << PHASE_SHIFT) | adjust)) {

s = state;

phase = (int)(root.state >>> PHASE_SHIFT);

// assert (int)s == EMPTY;

}

break;

}

}

}

}

return phase;

}

// 等待onAdvance()方法執(zhí)行完畢

// 原理是先自旋一定次數(shù),如果進(jìn)入下一個(gè)階段,這個(gè)方法直接就返回了,

// 如果自旋一定次數(shù)后還沒有進(jìn)入下一個(gè)階段,則當(dāng)前線程入隊(duì)列,等待onAdvance()執(zhí)行完畢喚醒

private int internalAwaitAdvance(int phase, QNode node) {

// 保證隊(duì)列為空

releaseWaiters(phase-1); // ensure old queue clean

boolean queued = false; // true when node is enqueued

int lastUnarrived = 0; // to increase spins upon change

// 自旋的次數(shù)

int spins = SPINS_PER_ARRIVAL;

long s;

int p;

// 檢查當(dāng)前階段是否變化,如果變化了說明進(jìn)入下一個(gè)階段了,這時(shí)候就沒有必要自旋了

while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {

// 如果node為空,注冊(cè)的時(shí)候傳入的為空

if (node == null) { // spinning in noninterruptible mode

// 未完成的參與者數(shù)量

int unarrived = (int)s & UNARRIVED_MASK;

// unarrived有變化,增加自旋次數(shù)

if (unarrived != lastUnarrived &&

(lastUnarrived = unarrived) < NCPU)

spins += SPINS_PER_ARRIVAL;

boolean interrupted = Thread.interrupted();

// 自旋次數(shù)完了,則新建一個(gè)節(jié)點(diǎn)

if (interrupted || --spins < 0) { // need node to record intr

node = new QNode(this, phase, false, false, 0L);

node.wasInterrupted = interrupted;

}

}

else if (node.isReleasable()) // done or aborted

break;

else if (!queued) { // push onto queue

// 節(jié)點(diǎn)入隊(duì)列

AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;

QNode q = node.next = head.get();

if ((q == null || q.phase == phase) &&

(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq

queued = head.compareAndSet(q, node);

}

else {

try {

// 當(dāng)前線程進(jìn)入阻塞狀態(tài),跟調(diào)用LockSupport.park()一樣,等待被喚醒

ForkJoinPool.managedBlock(node);

} catch (InterruptedException ie) {

node.wasInterrupted = true;

}

}

}

// 到這里說明節(jié)點(diǎn)所在線程已經(jīng)被喚醒了

if (node != null) {

// 置空節(jié)點(diǎn)中的線程

if (node.thread != null)

node.thread = null; // avoid need for unpark()

if (node.wasInterrupted && !node.interruptible)

Thread.currentThread().interrupt();

if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)

return abortWait(phase); // possibly clean up on abort

}

// 喚醒當(dāng)前階段阻塞著的線程

releaseWaiters(phase);

return p;

}

增加一個(gè)參與者總體的邏輯為:

(1)增加一個(gè)參與者,需要同時(shí)增加parties和unarrived兩個(gè)數(shù)值,也就是state的中16位和低16位;

(2)如果是第一個(gè)參與者,則嘗試原子更新state的值,如果成功了就退出;

(3)如果不是第一個(gè)參與者,則檢查是不是在執(zhí)行onAdvance(),如果是等待onAdvance()執(zhí)行完成,如果否則嘗試原子更新state的值,直到成功退出;

(4)等待onAdvance()完成是采用先自旋后進(jìn)入隊(duì)列排隊(duì)的方式等待,減少線程上下文切換;

arriveAndAwaitAdvance()方法

當(dāng)前線程當(dāng)前階段執(zhí)行完畢,等待其它線程完成當(dāng)前階段。

如果當(dāng)前線程是該階段最后一個(gè)到達(dá)的,則當(dāng)前線程會(huì)執(zhí)行onAdvance()方法,并喚醒其它線程進(jìn)入下一個(gè)階段。

public int arriveAndAwaitAdvance() {

// Specialization of doArrive+awaitAdvance eliminating some reads/paths

final Phaser root = this.root;

for (;;) {

// state的值

long s = (root == this) ? state : reconcileState();

// 當(dāng)前階段

int phase = (int)(s >>> PHASE_SHIFT);

if (phase < 0)

return phase;

// parties和unarrived的值

int counts = (int)s;

// unarrived的值(state的低16位)

int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);

if (unarrived <= 0)

throw new IllegalStateException(badArrive(s));

// 修改state的值

if (UNSAFE.compareAndSwapLong(this, stateOffset, s,

s -= ONE_ARRIVAL)) {

// 如果不是最后一個(gè)到達(dá)的,則調(diào)用internalAwaitAdvance()方法自旋或進(jìn)入隊(duì)列等待

if (unarrived > 1)

// 這里是直接返回了,internalAwaitAdvance()方法的源碼見register()方法解析

return root.internalAwaitAdvance(phase, null);

// 到這里說明是最后一個(gè)到達(dá)的參與者

if (root != this)

return parent.arriveAndAwaitAdvance();

// n只保留了state中parties的部分,也就是中16位

long n = s & PARTIES_MASK; // base of next state

// parties的值,即下一次需要到達(dá)的參與者數(shù)量

int nextUnarrived = (int)n >>> PARTIES_SHIFT;

// 執(zhí)行onAdvance()方法,返回true表示下一階段參與者數(shù)量為0了,也就是結(jié)束了

if (onAdvance(phase, nextUnarrived))

n |= TERMINATION_BIT;

else if (nextUnarrived == 0)

n |= EMPTY;

else

// n 加上unarrived的值

n |= nextUnarrived;

// 下一個(gè)階段等待當(dāng)前階段加1

int nextPhase = (phase + 1) & MAX_PHASE;

// n 加上下一階段的值

n |= (long)nextPhase << PHASE_SHIFT;

// 修改state的值為n

if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))

return (int)(state >>> PHASE_SHIFT); // terminated

// 喚醒其它參與者并進(jìn)入下一個(gè)階段

releaseWaiters(phase);

// 返回下一階段的值

return nextPhase;

}

}

}

arriveAndAwaitAdvance的大致邏輯為:

(1)修改state中unarrived部分的值減1;

(2)如果不是最后一個(gè)到達(dá)的,則調(diào)用internalAwaitAdvance()方法自旋或排隊(duì)等待;

(3)如果是最后一個(gè)到達(dá)的,則調(diào)用onAdvance()方法,然后修改state的值為下一階段對(duì)應(yīng)的值,并喚醒其它等待的線程;

(4)返回下一階段的值;

總結(jié)

(1)Phaser適用于多階段多任務(wù)的場景,每個(gè)階段的任務(wù)都可以控制得很細(xì);

(2)Phaser內(nèi)部使用state變量及隊(duì)列實(shí)現(xiàn)整個(gè)邏輯【本篇文章由公眾號(hào)“彤哥讀源碼”原創(chuàng),請(qǐng)支持原創(chuàng),謝謝!】;

(3)state的高32位存儲(chǔ)當(dāng)前階段phase,中16位存儲(chǔ)當(dāng)前階段參與者(任務(wù))的數(shù)量parties,低16位存儲(chǔ)未完成參與者的數(shù)量unarrived;

(4)隊(duì)列會(huì)根據(jù)當(dāng)前階段的奇偶性選擇不同的隊(duì)列;

(5)當(dāng)不是最后一個(gè)參與者到達(dá)時(shí),會(huì)自旋或者進(jìn)入隊(duì)列排隊(duì)來等待所有參與者完成任務(wù);

(6)當(dāng)最后一個(gè)參與者完成任務(wù)時(shí),會(huì)喚醒隊(duì)列中的線程并進(jìn)入下一個(gè)階段;

彩蛋

Phaser相對(duì)于CyclicBarrier和CountDownLatch的優(yōu)勢?

答:優(yōu)勢主要有兩點(diǎn):

(1)Phaser可以完成多階段,而一個(gè)CyclicBarrier或者CountDownLatch一般只能控制一到兩個(gè)階段的任務(wù);

(2)Phaser每個(gè)階段的任務(wù)數(shù)量可以控制,而一個(gè)CyclicBarrier或者CountDownLatch任務(wù)數(shù)量一旦確定不可修改。

推薦閱讀

歡迎關(guān)注我的公眾號(hào)“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

總結(jié)

以上是生活随笔為你收集整理的phaser java_死磕 java同步系列之Phaser源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 正在播放adn156松下纱荣子 | 久久成人久久 | 亚洲最新偷拍 | 四虎永久免费在线观看 | 欧美特级黄 | 冈本视频在线观看 | 少妇人妻邻居 | 91网站在线免费观看 | 亚洲a一区二区 | 日韩一区二区三区网站 | www四虎精品视频免费网站 | 淫僧荡尼巨乳(h)小说 | 人妻夜夜爽天天爽三区麻豆av网站 | 色涩涩 | 丁香六月色婷婷 | 国产精品一区二区无线 | 天堂毛片| 国产在线欧美在线 | 国产精品21p| 性生交大全免费看 | 最新中文在线视频 | 岛国av免费 | 深夜激情网 | 国产精品人八做人人女人a级刘 | 超碰在线成人 | 污片网站 | 男人天堂久久 | 国产精品一区av | 久久福利一区 | 精品国产视频一区二区 | 一本一道精品欧美中文字幕 | 国产一级久久久久毛片精品 | 精品视频免费在线观看 | 污片免费观看 | 九色一区 | 国内精品国产成人国产三级 | 黄色网址多少 | 国产精品区二区三区日本 | 国产精品成人网 | 国产成人精品一区 | 日本免费一区二区三区四区五六区 | 国产码视频 | av资源导航 | 午夜亚洲天堂 | 亚洲国产精品无码观看久久 | 影音先锋亚洲一区 | 亚洲精视频 | 国产福利一区在线观看 | 亚洲精品二区三区 | 天天操操 | 波多野结衣中文字幕一区二区三区 | 免费av影视 | 男人插女人的网站 | 无码人妻一区二区三区免费n鬼沢 | 亚洲九九色 | 国产精品无码一区二区三区在线看 | 人人舔 | 亚洲福利在线视频 | 午夜男人av| 一区二区三区在线免费播放 | 91调教视频 | 午夜精品久久久久久久四虎美女版 | 亚洲视频中文字幕 | 亚洲快播 | 少妇人妻好深好紧精品无码 | 久久77777| 成人精品在线播放 | 视频免费1区二区三区 | 国产高清免费在线播放 | 国精产品99永久一区一区 | 91视频最新入口 | 亚洲精品一区二三区不卡 | 国产欧美视频一区二区三区 | 亚洲综合自拍 | 小黄网站在线观看 | 黄av在线 | 影音先锋久久 | 国产aaaaaaa| 色香蕉在线视频 | 色男人影院 | eeuss日韩 | 欧美区一区二区三 | 久久草视频 | 日日日网站| 亚洲五月花 | 亚洲啪啪网站 | 免费在线黄色片 | 91噜噜噜 | 在线看av的网址 | 日韩一级在线播放 | 香蕉视频成人在线 | 先锋资源国产 | 国产精品一区二区三区免费视频 | av鲁丝一区鲁丝二区鲁丝三区 | 欧美性69 | 极品探花在线 | av不卡网| 国产精品色婷婷99久久精品 | 国产大屁股喷水视频在线观看 |