打通JAVA与内核系列之一ReentrantLock锁的实现原理
簡介:寫JAVA代碼的同學都知道,JAVA里的鎖有兩大類,一類是synchronized鎖,一類是concurrent包里的鎖(JUC鎖)。其中synchronized鎖是JAVA語言層面提供的能力,在此不展開,本文主要討論JUC里的ReentrantLock鎖。
作者 | 蔣沖
來源 | 阿里技術公眾號
寫JAVA代碼的同學都知道,JAVA里的鎖有兩大類,一類是synchronized鎖,一類是concurrent包里的鎖(JUC鎖)。其中synchronized鎖是JAVA語言層面提供的能力,在此不展開,本文主要討論JUC里的ReentrantLock鎖。
一 JDK層
1 AbstractQueuedSynchronizer
ReentrantLock的lock(),unlock()等API其實依賴于內部的Synchronizer(注意,不是synchronized)來實現。Synchronizer又分為FairSync和NonfairSync,顧名思義是指公平和非公平。
當調用ReentrantLock的lock方法時,其實就只是簡單地轉交給Synchronizer的lock()方法:
代碼節選自:java.util.concurrent.locks.ReentrantLock.java/** Synchronizer providing all implementation mechanics */private final Sync sync;/*** Base of synchronization control for this lock. Subclassed* into fair and nonfair versions below. Uses AQS state to* represent the number of holds on the lock.*/abstract static class Sync extends AbstractQueuedSynchronizer { ...... }public void lock() {sync.lock();}那么這個sync又是什么?我們看到Sync 繼承自AbstractQueueSynchronizer(AQS),AQS是concurrent包的基石,AQS本身并不實現任何同步接口(比如lock,unlock,countDown等等),但是它定義了一個并發資源控制邏輯的框架(運用了template method 設計模式),它定義了acquire和release方法用于獨占地(exclusive)獲取和釋放資源,以及acquireShared和releaseShared方法用于共享地獲取和釋放資源。比如acquire/release用于實現ReentrantLock,而acquireShared/releaseShared用于實現CountDownLacth,Semaphore。比如acquire的框架如下:
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}整體邏輯是,先進行一次tryAcquire,如果成功了,就沒啥事了,調用者繼續執行自己后面的代碼,如果失敗,則執行addWaiter和acquireQueued。其中tryAcquire()需要子類根據自己的同步需求進行實現,而acquireQueued() 和addWaiter() 已經由AQS實現。addWaiter的作用是把當前線程加入到AQS內部同步隊列的尾部,而acquireQueued的作用是當tryAcquire()失敗的時候阻塞當前線程。
addWaiter的代碼如下:
/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {//創建節點,設置關聯線程和模式(獨占或共享)Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 如果尾節點不為空,說明同步隊列已經初始化過if (pred != null) {//新節點的前驅節點設置為尾節點node.prev = pred;// 設置新節點為尾節點if (compareAndSetTail(pred, node)) {//老的尾節點的后繼節點設置為新的尾節點。 所以同步隊列是一個雙向列表。pred.next = node;return node;}}//如果尾節點為空,說明隊列還未初始化,需要初始化head節點并加入新節點enq(node);return node;}enq(node)的代碼如下:
/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initialize// 如果tail為空,則新建一個head節點,并且tail和head都指向這個head節點//隊列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何線程關聯if (compareAndSetHead(new Node()))tail = head;} else {//第二次循環進入這個分支,node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}addWaiter執行結束后,同步隊列的結構如下所示:
acquireQueued的代碼如下:
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//獲取當前node的前驅nodefinal Node p = node.predecessor();//如果前驅node是head node,說明自己是第一個排隊的線程,則嘗試獲鎖if (p == head && tryAcquire(arg)) {//把獲鎖成功的當前節點變成head node(啞節點)。setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}acquireQueued的邏輯是:
判斷自己是不是同步隊列中的第一個排隊的節點,則嘗試進行加鎖,如果成功,則把自己變成head node,過程如下所示:
如果自己不是第一個排隊的節點或者tryAcquire失敗,則調用shouldParkAfterFailedAcquire,其主要邏輯是使用CAS將節點狀態由 INITIAL 設置成 SIGNAL,表示當前線程阻塞等待SIGNAL喚醒。如果設置失敗,會在 acquireQueued 方法中的死循環中繼續重試,直至設置成功,然后調用parkAndCheckInterrupt 方法。parkAndCheckInterrupt的作用是把當前線程阻塞掛起,等待喚醒。parkAndCheckInterrupt的實現需要借助下層的能力,這是本文的重點,在下文中逐層闡述。
2 ReentrantLock
下面就讓我們一起看看ReentrantLock是如何基于AbstractQueueSynchronizer實現其語義的。
ReentrantLock內部使用的FairSync和NonfairSync,它們都是AQS的子類,比如FairSync的主要代碼如下:
/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}AQS中最重要的一個字段就是state,鎖和同步器的實現都是圍繞著這個字段的修改展開的。AQS可以實現各種不同的鎖和同步器的原因之一就是,不同的鎖或同步器按照自己的需要可以對同步狀態的含義有不同的定義,并重寫對應的tryAcquire, tryRelease或tryAcquireshared, tryReleaseShared等方法來操作同步狀態。
我們來看看ReentrantLock的FairSync的tryAcquire的邏輯:
至此,JAVA層面的實現基本說清楚了,小結一下,整個框架如下所示:
關于unlock的實現,限于篇幅,就不討論了,下文重點分析lock過程中是如何把當前線程阻塞掛起的,就是上圖中的unsafe.park()是如何實現的。
二 JVM層
Unsafe.park和Unsafe.unpark 是sun.misc.Unsafe類的native 方法,
public native void unpark(Object var1);public native void park(boolean var1, long var2);這兩個方法的實現是在JVM的hotspot/src/share/vm/prims/unsafe.cpp 文件中,
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))UnsafeWrapper("Unsafe_Park");EventThreadPark event; #ifndef USDT2HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */JavaThreadParkedState jtps(thread, time != 0);thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker()); #endif /* USDT2 */if (event.should_commit()) {const oop obj = thread->current_park_blocker();if (time == 0) {post_thread_park_event(&event, obj, min_jlong, min_jlong);} else {if (isAbsolute != 0) {post_thread_park_event(&event, obj, min_jlong, time);} else {post_thread_park_event(&event, obj, time, min_jlong);}}} UNSAFE_END核心是邏輯是thread->parker()->park(isAbsolute != 0, time); 就是獲取java線程的parker對象,然后執行它的park方法。每個java線程都有一個Parker實例,Parker類是這樣定義的:
class Parker : public os::PlatformParker { private:volatile int _counter ;... public:void park(bool isAbsolute, jlong time);void unpark();... } class PlatformParker : public CHeapObj<mtInternal> {protected:enum {REL_INDEX = 0,ABS_INDEX = 1};int _cur_index; // which cond is in use: -1, 0, 1pthread_mutex_t _mutex [1] ;pthread_cond_t _cond [2] ; // one for relative times and one for abs.public: // TODO-FIXME: make dtor private~PlatformParker() { guarantee (0, "invariant") ; }public:PlatformParker() {int status;status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());assert_status(status == 0, status, "cond_init rel");status = pthread_cond_init (&_cond[ABS_INDEX], NULL);assert_status(status == 0, status, "cond_init abs");status = pthread_mutex_init (_mutex, NULL);assert_status(status == 0, status, "mutex_init");_cur_index = -1; // mark as unused} };park方法:
void Parker::park(bool isAbsolute, jlong time) {// Return immediately if a permit is available.// We depend on Atomic::xchg() having full barrier semantics// since we are doing a lock-free update to _counter.if (Atomic::xchg(0, &_counter) > 0) return;Thread* thread = Thread::current();assert(thread->is_Java_thread(), "Must be JavaThread");JavaThread *jt = (JavaThread *)thread;if (Thread::is_interrupted(thread, false)) {return;}// Next, demultiplex/decode time argumentstimespec absTime;if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at allreturn;}if (time > 0) {unpackTime(&absTime, isAbsolute, time);}進入safepoint region,更改線程為阻塞狀態ThreadBlockInVM tbivm(jt);// Don't wait if cannot get lock since interference arises from// unblocking. Also. check interrupt before trying waitif (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {//如果線程被中斷,或者嘗試給互斥變量加鎖時失敗,比如被其它線程鎖住了,直接返回return;}//到這里,意味著pthread_mutex_trylock(_mutex)成功int status ;if (_counter > 0) { // no wait needed_counter = 0;status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;OrderAccess::fence();return;}#ifdef ASSERT// Don't catch signals while blocked; let the running threads have the signals.// (This allows a debugger to break into the running thread.)sigset_t oldsigs;sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endifOSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);jt->set_suspend_equivalent();// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()assert(_cur_index == -1, "invariant");if (time == 0) {_cur_index = REL_INDEX; // arbitrary choice when not timedstatus = pthread_cond_wait (&_cond[_cur_index], _mutex) ;} else {_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;if (status != 0 && WorkAroundNPTLTimedWaitHang) {pthread_cond_destroy (&_cond[_cur_index]) ;pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());}}_cur_index = -1;assert_status(status == 0 || status == EINTR ||status == ETIME || status == ETIMEDOUT,status, "cond_timedwait");#ifdef ASSERTpthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif_counter = 0 ;status = pthread_mutex_unlock(_mutex) ;assert_status(status == 0, status, "invariant") ;// Paranoia to ensure our locked and lock-free paths interact// correctly with each other and Java-level accesses.OrderAccess::fence();// If externally suspended while waiting, re-suspendif (jt->handle_special_suspend_equivalent_condition()) {jt->java_suspend_self();} }park的思路:parker內部有個關鍵字段_counter, 這個counter用來記錄所謂的“permit”,當_counter大于0時,意味著有permit,然后就可以把_counter設置為0,就算是獲得了permit,可以繼續運行后面的代碼。如果此時_counter不大于0,則等待這個條件滿足。
下面我具體來看看park的具體實現:
所以本質上來講,LockSupport.park 是通過pthread庫的條件變量pthread_cond_t來實現的。下面我們就來看看pthread_cond_t 是怎么實現的。
三 GLIBC 層
pthread_cond_t 典型的用法如下:
#include < pthread.h> #include < stdio.h> #include < stdlib.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /*初始化互斥鎖*/ pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化條件變量 void *thread1(void *); void *thread2(void *);int i=1; int main(void) {pthread_t t_a;pthread_t t_b;pthread_create(&t_a,NULL,thread1,(void *)NULL);/*創建進程t_a*/pthread_create(&t_b,NULL,thread2,(void *)NULL); /*創建進程t_b*/pthread_join(t_b, NULL);/*等待進程t_b結束*/pthread_mutex_destroy(&mutex);pthread_cond_destroy(&cond);exit(0); } void *thread1(void *junk) {for(i=1;i<=9;i++){pthread_mutex_lock(&mutex);//if(i%3==0)pthread_cond_signal(&cond);/*條件改變,發送信號,通知t_b進程*/else printf("thead1:%d/n",i);pthread_mutex_unlock(&mutex);//*解鎖互斥量*/printf("Up Unlock Mutex/n"); sleep(1);} } void *thread2(void *junk) {while(i<9){pthread_mutex_lock(&mutex);if(i%3!=0)pthread_cond_wait(&cond,&mutex);/*等待*/printf("thread2:%d/n",i);pthread_mutex_unlock(&mutex);printf("Down Ulock Mutex/n");sleep(1);}}重點就是:無論是pthread_cond_wait還是pthread_cond_signal 都必須得先pthread_mutex_lock。如果沒有這個保護,可能會產生race condition,漏掉信號。pthread_cond_wait()函數一進入wait狀態就會自動release mutex。當其他線程通過pthread_cond_signal或pthread_cond_broadcast把該線程喚醒,使pthread_cond_wait()返回時,該線程又自動獲得該mutex。
整個過程如下圖所示:
1 pthread_mutex_lock
例如,在Linux中,使用了稱為Futex(快速用戶空間互斥鎖的簡稱)的系統。
在此系統中,對用戶空間中的互斥變量執行原子增量和測試操作。
如果操作結果表明鎖上沒有爭用,則對pthread_mutex_lock的調用將返回,而無需將上下文切換到內核中,因此獲取互斥量的操作可以非???。
僅當檢測到爭用時,系統調用(稱為futex)才會發生,并且上下文切換到內核中,這會使調用進程進入睡眠狀態,直到釋放互斥鎖為止。
還有很多更多的細節,尤其是對于可靠和/或優先級繼承互斥,但這就是它的本質。
nptl/pthread_mutex_lock.c
int PTHREAD_MUTEX_LOCK (pthread_mutex_t *mutex) {/* See concurrency notes regarding mutex type which is loaded from __kindin struct __pthread_mutex_s in sysdeps/nptl/bits/thread-shared-types.h. */unsigned int type = PTHREAD_MUTEX_TYPE_ELISION (mutex);LIBC_PROBE (mutex_entry, 1, mutex);if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP| PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))return __pthread_mutex_lock_full (mutex);if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)){FORCE_ELISION (mutex, goto elision);simple:/* Normal mutex. */LLL_MUTEX_LOCK_OPTIMIZED (mutex);assert (mutex->__data.__owner == 0);} #if ENABLE_ELISION_SUPPORTelse if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_ELISION_NP)){elision: __attribute__((unused))/* This case can never happen on a system without elision,as the mutex type initialization functions will notallow to set the elision flags. *//* Don't record owner or users for elision case. This is atail call. */return LLL_MUTEX_LOCK_ELISION (mutex);} #endifelse if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)== PTHREAD_MUTEX_RECURSIVE_NP, 1)){/* Recursive mutex. */pid_t id = THREAD_GETMEM (THREAD_SELF, tid);/* Check whether we already hold the mutex. */if (mutex->__data.__owner == id){/* Just bump the counter. */if (__glibc_unlikely (mutex->__data.__count + 1 == 0))/* Overflow of the counter. */return EAGAIN;++mutex->__data.__count;return 0;}/* We have to get the mutex. */LLL_MUTEX_LOCK_OPTIMIZED (mutex);assert (mutex->__data.__owner == 0);mutex->__data.__count = 1;}else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)== PTHREAD_MUTEX_ADAPTIVE_NP, 1)){if (LLL_MUTEX_TRYLOCK (mutex) != 0){int cnt = 0;int max_cnt = MIN (max_adaptive_count (),mutex->__data.__spins * 2 + 10);do{if (cnt++ >= max_cnt){LLL_MUTEX_LOCK (mutex);break;}atomic_spin_nop ();}while (LLL_MUTEX_TRYLOCK (mutex) != 0);mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;}assert (mutex->__data.__owner == 0);}else{pid_t id = THREAD_GETMEM (THREAD_SELF, tid);assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);/* Check whether we already hold the mutex. */if (__glibc_unlikely (mutex->__data.__owner == id))return EDEADLK;goto simple;}pid_t id = THREAD_GETMEM (THREAD_SELF, tid);/* Record the ownership. */mutex->__data.__owner = id; #ifndef NO_INCR++mutex->__data.__nusers; #endifLIBC_PROBE (mutex_acquired, 1, mutex);return 0; }pthread_mutex_t的定義如下:
typedef union {struct __pthread_mutex_s{int __lock;unsigned int __count;int __owner;unsigned int __nusers;int __kind;int __spins;__pthread_list_t __list;} __data;...... } pthread_mutex_t;其中__kind字段是指鎖的類型,取值如下:
/* Mutex types. */ enum { PTHREAD_MUTEX_TIMED_NP,PTHREAD_MUTEX_RECURSIVE_NP,PTHREAD_MUTEX_ERRORCHECK_NP,PTHREAD_MUTEX_ADAPTIVE_NP #if defined __USE_UNIX98 || defined __USE_XOPEN2K8,PTHREAD_MUTEX_NORMAL = PTHREAD_MUTEX_TIMED_NP,PTHREAD_MUTEX_RECURSIVE = PTHREAD_MUTEX_RECURSIVE_NP,PTHREAD_MUTEX_ERRORCHECK = PTHREAD_MUTEX_ERRORCHECK_NP,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL #endif #ifdef __USE_GNU/* For compatibility. */, PTHREAD_MUTEX_FAST_NP = PTHREAD_MUTEX_TIMED_NP #endif };其中:
- PTHREAD_MUTEX_TIMED_NP,這是缺省值,也就是普通鎖。
- PTHREAD_MUTEX_RECURSIVE_NP,可重入鎖,允許同一個線程對同一個鎖成功獲得多次,并通過多次unlock解鎖。
- PTHREAD_MUTEX_ERRORCHECK_NP,檢錯鎖,如果同一個線程重復請求同一個鎖,則返回EDEADLK,否則與PTHREAD_MUTEX_TIMED_NP類型相同。
- PTHREAD_MUTEX_ADAPTIVE_NP,自適應鎖,自旋鎖與普通鎖的混合?!?/li>
mutex默認用的是PTHREAD_MUTEX_TIMED_NP,所以會走到LLL_MUTEX_LOCK_OPTIMIZED,這是個宏:
# define LLL_MUTEX_LOCK_OPTIMIZED(mutex) lll_mutex_lock_optimized (mutex) lll_mutex_lock_optimized (pthread_mutex_t *mutex) {/* The single-threaded optimization is only valid for privatemutexes. For process-shared mutexes, the mutex could be in ashared mapping, so synchronization with another process is neededeven without any threads. If the lock is already marked asacquired, POSIX requires that pthread_mutex_lock deadlocks fornormal mutexes, so skip the optimization in that case aswell. */int private = PTHREAD_MUTEX_PSHARED (mutex);if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)mutex->__data.__lock = 1;elselll_lock (mutex->__data.__lock, private); }由于不是LLL_PRIVATE,所以走lll_lock, lll_lock也是個宏:
#define lll_lock(futex, private) \__lll_lock (&(futex), private)注意這里出現了futex,本文的后續主要就是圍繞它展開的。
#define __lll_lock(futex, private) \((void) \({ \int *__futex = (futex); \if (__glibc_unlikely \(atomic_compare_and_exchange_bool_acq (__futex, 1, 0))) \{ \if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \__lll_lock_wait_private (__futex); \else \__lll_lock_wait (__futex, private); \} \}))其中,atomic_compare_and_exchange_bool_acq是嘗試通過原子操作嘗試將__futex(就是mutex->__data.__lock)從0變為1,如果成功就直接返回了,如果失敗,則調用__lll_lock_wait,代碼如下:
void __lll_lock_wait (int *futex, int private) {if (atomic_load_relaxed (futex) == 2)goto futex;while (atomic_exchange_acquire (futex, 2) != 0){futex:LIBC_PROBE (lll_lock_wait, 1, futex);futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2. */} }在這里先要說明一下,pthread將futex的鎖狀態定義為3種:
- 0,代表當前鎖空閑無鎖,可以進行快速上鎖,不需要進內核。
- 1,代表有線程持有當前鎖,如果這時有其它線程需要上鎖,就必須標記futex為“鎖競爭”,然后通過futex系統調用進內核把當前線程掛起。
- 2,代表鎖競爭,有其它線程將要或正在內核的futex系統中排隊等待鎖。
所以上鎖失敗進入到__lll_lock_wait這里后,先判斷futex 是不是等于2,如果是則說明大家都在排隊,你也排著吧(直跳轉到futex_wait)。如果不等于2,那說明你是第一個來競爭的人,把futex設置成2,告訴后面來的人要排隊,然后自己以身作則先排隊。
futex_wait 實質上就是調用futex系統調用。在第四節,我們就來仔細分析這個系統調用。
2 pthread_cond_wait
本質也是走到futex系統調用,限于篇幅就不展開了。
四 內核層
為什么要有futex,它解決什么問題?何時加入內核的?
簡單來講,futex的解決思路是:在無競爭的情況下操作完全在user space進行,不需要系統調用,僅在發生競爭的時候進入內核去完成相應的處理(wait 或者 wake up)。所以說,futex是一種user mode和kernel mode混合的同步機制,需要兩種模式合作才能完成,futex變量位于user space,而不是內核對象,futex的代碼也分為user mode和kernel mode兩部分,無競爭的情況下在user mode,發生競爭時則通過sys_futex系統調用進入kernel mode進行處理。
用戶態的部分已經在前面講解了,本節重點講解futex在內核部分的實現。
futex 設計了三個基本數據結構:futex_hash_bucket,futex_key,futex_q。
struct futex_hash_bucket {atomic_t waiters;spinlock_t lock;struct plist_head chain; } ____cacheline_aligned_in_smp; struct futex_q {struct plist_node list;struct task_struct *task;spinlock_t *lock_ptr;union futex_key key; //唯一標識uaddr的key值struct futex_pi_state *pi_state;struct rt_mutex_waiter *rt_waiter;union futex_key *requeue_pi_key;u32 bitset; }; union futex_key { struct {unsigned long pgoff;struct inode *inode;int offset;} shared;struct {unsigned long address;struct mm_struct *mm;int offset;} private; struct {unsigned long word;void *ptr;int offset;} both; };其實還有個struct __futex_data, 如下所示,這個
static struct {struct futex_hash_bucket *queues;unsigned long hashsize; } __futex_data __read_mostly __aligned(2*sizeof(long));#define futex_queues (__futex_data.queues) #define futex_hashsize (__futex_data.hashsize)在futex初始化的時候(futex_init),會確定hashsize,比如24核cpu時,hashsize = 8192。然后根據這個hashsize調用alloc_large_system_hash分配數組空間,并初始化數組元素里的相關字段,比如plist_head, lock。
static int __init futex_init(void) {unsigned int futex_shift;unsigned long i;#if CONFIG_BASE_SMALLfutex_hashsize = 16; #elsefutex_hashsize = roundup_pow_of_two(256 * num_possible_cpus()); #endiffutex_queues = alloc_large_system_hash("futex", sizeof(*futex_queues),futex_hashsize, 0,futex_hashsize < 256 ? HASH_SMALL : 0,&futex_shift, NULL,futex_hashsize, futex_hashsize);futex_hashsize = 1UL << futex_shift;futex_detect_cmpxchg();for (i = 0; i < futex_hashsize; i++) {atomic_set(&futex_queues[i].waiters, 0);plist_head_init(&futex_queues[i].chain);spin_lock_init(&futex_queues[i].lock);}return 0; }這些數據結構之間的關系如下所示:
腦子里有了數據結構,流程就容易理解了。futex_wait的總體流程如下:
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,ktime_t *abs_time, u32 bitset) {struct hrtimer_sleeper timeout, *to = NULL;struct restart_block *restart;struct futex_hash_bucket *hb;struct futex_q q = futex_q_init;int ret;if (!bitset)return -EINVAL;q.bitset = bitset;if (abs_time) {to = &timeout;hrtimer_init_on_stack(&to->timer, (flags & FLAGS_CLOCKRT) ?CLOCK_REALTIME : CLOCK_MONOTONIC,HRTIMER_MODE_ABS);hrtimer_init_sleeper(to, current);hrtimer_set_expires_range_ns(&to->timer, *abs_time,current->timer_slack_ns);}retry:/** Prepare to wait on uaddr. On success, holds hb lock and increments* q.key refs.*/ret = futex_wait_setup(uaddr, val, flags, &q, &hb);if (ret)goto out;/* queue_me and wait for wakeup, timeout, or a signal. */futex_wait_queue_me(hb, &q, to);/* If we were woken (and unqueued), we succeeded, whatever. */ret = 0;/* unqueue_me() drops q.key ref */if (!unqueue_me(&q))goto out;ret = -ETIMEDOUT;if (to && !to->task)goto out;/** We expect signal_pending(current), but we might be the* victim of a spurious wakeup as well.*/if (!signal_pending(current))goto retry;ret = -ERESTARTSYS;if (!abs_time)goto out;restart = ¤t->restart_block;restart->fn = futex_wait_restart;restart->futex.uaddr = uaddr;restart->futex.val = val;restart->futex.time = *abs_time;restart->futex.bitset = bitset;restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;ret = -ERESTART_RESTARTBLOCK;out:if (to) {hrtimer_cancel(&to->timer);destroy_hrtimer_on_stack(&to->timer);}return ret; }函數 futex_wait_setup主要做兩件事,一是對uaddr進行hash,找到futex_hash_bucket并獲取它上面的自旋鎖,二是判斷*uaddr是否為預期值。如果不相等則會立即返回,由用戶態繼續trylock。
** futex_wait_setup() - Prepare to wait on a futex* @uaddr: the futex userspace address* @val: the expected value* @flags: futex flags (FLAGS_SHARED, etc.)* @q: the associated futex_q* @hb: storage for hash_bucket pointer to be returned to caller** Setup the futex_q and locate the hash_bucket. Get the futex value and* compare it with the expected value. Handle atomic faults internally.* Return with the hb lock held and a q.key reference on success, and unlocked* with no q.key reference on failure.** Return:* - 0 - uaddr contains val and hb has been locked;* - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked*/ static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,struct futex_q *q, struct futex_hash_bucket **hb) {u32 uval;int ret; retry://初始化futex_q, 把uaddr設置到futex_key的字段中,將來futex_wake時也是通過這個key來查找futex。ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &q->key, VERIFY_READ);if (unlikely(ret != 0))return ret;retry_private://根據key計算hash,然后在數組里找到對應的futex_hash_bucket*hb = queue_lock(q);//原子地將uaddr的值讀到uval中ret = get_futex_value_locked(&uval, uaddr);if (ret) {queue_unlock(*hb);ret = get_user(uval, uaddr);if (ret)goto out;if (!(flags & FLAGS_SHARED))goto retry_private;put_futex_key(&q->key);goto retry;}//如果當前uaddr指向的值不等于val,即說明其他進程修改了//uaddr指向的值,等待條件不再成立,不用阻塞直接返回。if (uval != val) {queue_unlock(*hb);ret = -EWOULDBLOCK;}out:if (ret)put_futex_key(&q->key);return ret; }然后調用futex_wait_queue_me 把當前進程掛起:
/*** futex_wait_queue_me() - queue_me() and wait for wakeup, timeout, or signal* @hb: the futex hash bucket, must be locked by the caller* @q: the futex_q to queue up on* @timeout: the prepared hrtimer_sleeper, or null for no timeout*/ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,struct hrtimer_sleeper *timeout) {/** The task state is guaranteed to be set before another task can* wake it. set_current_state() is implemented using smp_store_mb() and* queue_me() calls spin_unlock() upon completion, both serializing* access to the hash list and forcing another memory barrier.*/set_current_state(TASK_INTERRUPTIBLE);queue_me(q, hb);/* Arm the timer */if (timeout)hrtimer_start_expires(&timeout->timer, HRTIMER_MODE_ABS);/** If we have been removed from the hash list, then another task* has tried to wake us, and we can skip the call to schedule().*/if (likely(!plist_node_empty(&q->list))) {/** If the timer has already expired, current will already be* flagged for rescheduling. Only call schedule if there* is no timeout, or if it has yet to expire.*/if (!timeout || timeout->task)freezable_schedule();}__set_current_state(TASK_RUNNING); }futex_wait_queue_me主要做幾件事:
五 總結
本文主要是對JAVA中的ReentrantLock.lock流程進行了自上而下的梳理。
原文鏈接本文為阿里云原創內容,未經允許不得轉載。?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的打通JAVA与内核系列之一ReentrantLock锁的实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 搜索NLP行业模型和轻量化客户定制
- 下一篇: 庚顿数据:实时数据库赋能工业互联网