日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim

發布時間:2025/4/5 asp.net 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近同事對 ?.net core memcached 緩存客戶端 EnyimMemcachedCore 進行了高并發下的壓力測試,發現在 linux 上高并發下使用 async 異步方法讀取緩存數據會出現大量失敗的情況,比如在一次測試中,100萬次讀取緩存,只有12次成功,999988次失敗,好恐怖。如果改為同步方法,沒有一次失敗,100%成功。奇怪的是,同樣的壓力測試程序在 Windows 上異步讀取卻沒問題,100%成功。

排查后發現是2個地方使用的鎖引起的,一個是 ManualResetEventSlim?,一個是?Semaphore?,這2個鎖是在同步方法中使用的,但 aync 異步方法中調用了這2個同步方法,我們來分別看一下。

使用 ManualResetEventSlim 是在創建 Socket 連接時用于控制連接超時

var args = new SocketAsyncEventArgs(); using (var mres = new ManualResetEventSlim()) {args.Completed += (s, e) => mres.Set();if (socket.ConnectAsync(args)){if (!mres.Wait(timeout)){throw new TimeoutException("Could not connect to " + endpoint);}} }

使用?Semaphore 是在從 EnyimMemcachedCore 自己實現的 Socket 連接池獲取 Socket 連接時

if (!this.semaphore.WaitOne(this.queueTimeout)) {message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);result.Fail(message, new TimeoutException());// everyone is so busyreturn result; }

為了棄用這個2個鎖造成的異步并發問題,采取了下面2個改進措施:

1)對于?ManualResetEventSlim ,參考 corefx 中 SqlClient 的?SNITcpHandle 的實現,改用?CancellationTokenSource 控制連接超時

var cts = new CancellationTokenSource(); cts.CancelAfter(timeout); void Cancel() {if (!socket.Connected){socket.Dispose();} } cts.Token.Register(Cancel);socket.Connect(endpoint); if (socket.Connected) {connected = true; } else {socket.Dispose(); }

2)對于?Semaphore ,根據同事提交的 PR ,將?Semaphore 換成?SemaphoreSlim ,用?SemaphoreSlim.WaitAsync 方法等待信號量鎖

if (!await this.semaphore.WaitAsync(this.queueTimeout)) {message = "Pool is full, timeouting. " + _endPoint;if (_isDebugEnabled) _logger.LogDebug(message);result.Fail(message, new TimeoutException());// everyone is so busyreturn result; }

改進后,壓力測試結果立馬與同步方法一樣,100% 成功!

為什么會這樣?

我們到 github 的 coreclr 倉庫(針對 .net core 2.2)中看看?ManualResetEventSlim 與?Semaphore 的實現源碼,看能否找到一些線索。

(一)

先看看 ManualResetEventSlim.Wait 方法的實現代碼(523開始):

1)先?SpinWait 等待

var spinner = new SpinWait(); while (spinner.Count < spinCount) {spinner.SpinOnce(sleep1Threshold: -1);if (IsSet){return true;} }

SpinWait 等待時間比較短,不會造成長時間阻塞線程。

在高并發下大量線程在爭搶鎖,所以大量線程在這個階段等不到鎖。

2)然后?Monitor.Wait 等待

try {// ** the actual wait **if (!Monitor.Wait(m_lock, realMillisecondsTimeout))return false; //return immediately if the timeout has expired. } finally {// Clean up: we're done waiting.Waiters = Waiters - 1; }

Monitor.Wait 對應的實現代碼

[MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj);public static bool Wait(object obj, int millisecondsTimeout, bool exitContext) {if (obj == null)throw (new ArgumentNullException(nameof(obj)));return ObjWait(exitContext, millisecondsTimeout, obj); }

最終調用的是一個本地庫的?ObjWait 方法。

查閱一下?Monitor.Wait 方法的幫助文檔:

Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.

Monitor.Wait 的確會阻塞當前線程,這在異步高并發下會帶來問題,詳見一碼阻塞,萬碼等待:ASP.NET Core 同步方法調用異步方法“死鎖”的真相。

(二)

再看看?Semaphore 的實現代碼,它繼承自?WaitHandle?,?Semaphore.Wait 實際調用的是 WaitHandle.Wait ,后者調用的是?WaitOneNative?,這是一個本地庫的方法

[MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);

.net core 3.0 中有些變化,這里調用的是?WaitOneCore 方法

[MethodImpl(MethodImplOptions.InternalCall)] private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);

查閱一下 WaitHandle.Wait?方法的幫助文檔:?

Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.

WaitHandle.Wait 也會阻塞當前線程。

2個地方在等待鎖時都會阻塞線程,難怪高并發下會出問題。

(三)

接著閱讀?SemaphoreSlim 的源碼學習它是如何在 WaitAsync 中實現異步等待鎖的?

public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) {//...lock (m_lockObj!){// If there are counts available, allow this waiter to succeed.if (m_currentCount > 0){--m_currentCount;if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();return s_trueTask;}else if (millisecondsTimeout == 0){// No counts, if timeout is zero fail fastreturn s_falseTask;}// If there aren't, create and return a task to the caller.// The task will be completed either when they've successfully acquired// the semaphore or when the timeout expired or cancellation was requested.else{Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");var asyncWaiter = CreateAndAddAsyncWaiter();return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?asyncWaiter :WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);}} }

重點看 else 部分的代碼,SemaphoreSlim.WaitAsync 造了一個專門用于等待鎖的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用于創建?TaskNode 的實例

private TaskNode CreateAndAddAsyncWaiter() {// Create the taskvar task = new TaskNode();// Add it to the linked listif (m_asyncHead == null){m_asyncHead = task;m_asyncTail = task;}else{m_asyncTail.Next = task;task.Prev = m_asyncTail;m_asyncTail = task;}// Hand it backreturn task; }

從上面的代碼看到 TaskNode 用到了鏈表,神奇的等鎖專用 Task —— TaskNode 是如何實現的呢?

private sealed class TaskNode : Task<bool> {internal TaskNode? Prev, Next;internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { } }

好簡單!

那?SemaphoreSlim.WaitAsync 如何用?TaskNode 實現指定了超時時間的鎖等待?

看?WaitUntilCountOrTimeoutAsync 方法的實現源碼:

private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken) {// Wait until either the task is completed, timeout occurs, or cancellation is requested.// We need to ensure that the Task.Delay task is appropriately cleaned up if the await// completes due to the asyncWaiter completing, so we use our own token that we can explicitly// cancel, and we chain the caller's supplied token into it.using (var cts = cancellationToken.CanBeCanceled ?CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :new CancellationTokenSource()){var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));if (asyncWaiter == await waitCompleted.ConfigureAwait(false)){cts.Cancel(); // ensure that the Task.Delay task is cleaned upreturn true; // successfully acquired }}// If we get here, the wait has timed out or been canceled.// If the await completed synchronously, we still hold the lock. If it didn't,// we no longer hold the lock. As such, acquire it.lock (m_lockObj){// Remove the task from the list. If we're successful in doing so,// we know that no one else has tried to complete this waiter yet,// so we can safely cancel or timeout.if (RemoveAsyncWaiter(asyncWaiter)){cancellationToken.ThrowIfCancellationRequested(); // cancellation occurredreturn false; // timeout occurred }}// The waiter had already been removed, which means it's already completed or is about to// complete, so let it, and don't return until it does.return await asyncWaiter.ConfigureAwait(false); }

用 Task.WhenAny 等待 TaskNode 與?Task.Delay ,等其中任一者先完成,簡單到可怕。

又一次通過 .net core 源碼欣賞了高手是怎么玩轉 Task 的。

【2019-5-6更新】

今天將?Task.WhenAny +?Task.Delay 的招式用到了異步連接 Socket 的超時控制中

var connTask = _socket.ConnectAsync(_endpoint); if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask) {await connTask; }

轉載于:https://www.cnblogs.com/dudu/p/10812139.html

總結

以上是生活随笔為你收集整理的一次 .NET Core 中玩锁的经历:ManualResetEventSlim, Semaphore 与 SemaphoreSlim的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。