一次 .NET Core 中玩锁的经历:ManualResetEventSlim, SemaphoreSlim
最近同事對(duì) ?.net core memcached 緩存客戶(hù)端 EnyimMemcachedCore 進(jìn)行了高并發(fā)下的壓力測(cè)試,發(fā)現(xiàn)在 linux 上高并發(fā)下使用 async 異步方法讀取緩存數(shù)據(jù)會(huì)出現(xiàn)大量失敗的情況,比如在一次測(cè)試中,100萬(wàn)次讀取緩存,只有12次成功,999988次失敗,好恐怖。如果改為同步方法,沒(méi)有一次失敗,100%成功。奇怪的是,同樣的壓力測(cè)試程序在 Windows 上異步讀取卻沒(méi)問(wèn)題,100%成功。
排查后發(fā)現(xiàn)是2個(gè)地方使用的鎖引起的,一個(gè)是 ManualResetEventSlim?,一個(gè)是?Semaphore?,這2個(gè)鎖是在同步方法中使用的,但 aync 異步方法中調(diào)用了這2個(gè)同步方法,我們來(lái)分別看一下。
使用 ManualResetEventSlim 是在創(chuàng)建 Socket 連接時(shí)用于控制連接超時(shí)
var args = new SocketAsyncEventArgs();using (var mres = new ManualResetEventSlim())
{
args.Completed += (s, e) => mres.Set();
if (socket.ConnectAsync(args))
{
if (!mres.Wait(timeout))
{
throw new TimeoutException("Could not connect to " + endpoint);
}
}
}
使用?Semaphore 是在從 EnyimMemcachedCore 自己實(shí)現(xiàn)的 Socket 連接池獲取 Socket 連接時(shí)
if (!this.semaphore.WaitOne(this.queueTimeout)){
message = "Pool is full, timeouting. " + _endPoint;
if (_isDebugEnabled) _logger.LogDebug(message);
result.Fail(message, new TimeoutException());
// everyone is so busy
return result;
}
為了棄用這個(gè)2個(gè)鎖造成的異步并發(fā)問(wèn)題,采取了下面2個(gè)改進(jìn)措施:
1)對(duì)于?ManualResetEventSlim ,參考 corefx 中 SqlClient 的?SNITcpHandle?的實(shí)現(xiàn),改用?CancellationTokenSource 控制連接超時(shí)
var cts = new CancellationTokenSource();cts.CancelAfter(timeout);
void Cancel()
{
if (!socket.Connected)
{
socket.Dispose();
}
}
cts.Token.Register(Cancel);
socket.Connect(endpoint);
if (socket.Connected)
{
connected = true;
}
else
{
socket.Dispose();
}
2)對(duì)于?Semaphore ,根據(jù)同事提交的 PR ,將?Semaphore 換成?SemaphoreSlim ,用?SemaphoreSlim.WaitAsync 方法等待信號(hào)量鎖
if (!await this.semaphore.WaitAsync(this.queueTimeout)){
message = "Pool is full, timeouting. " + _endPoint;
if (_isDebugEnabled) _logger.LogDebug(message);
result.Fail(message, new TimeoutException());
// everyone is so busy
return result;
}
改進(jìn)后,壓力測(cè)試結(jié)果立馬與同步方法一樣,100% 成功!
為什么會(huì)這樣?
我們到 github 的 coreclr 倉(cāng)庫(kù)(針對(duì) .net core 2.2)中看看?ManualResetEventSlim?與?Semaphore?的實(shí)現(xiàn)源碼,看能否找到一些線索。
(一)
先看看 ManualResetEventSlim.Wait 方法的實(shí)現(xiàn)代碼(523開(kāi)始):
1)先?SpinWait 等待
var spinner = new SpinWait();while (spinner.Count < spinCount)
{
spinner.SpinOnce(sleep1Threshold: -1);
if (IsSet)
{
return true;
}
}
SpinWait 等待時(shí)間比較短,不會(huì)造成長(zhǎng)時(shí)間阻塞線程。
在高并發(fā)下大量線程在爭(zhēng)搶鎖,所以大量線程在這個(gè)階段等不到鎖。
2)然后?Monitor.Wait 等待
try{
// ** the actual wait **
if (!Monitor.Wait(m_lock, realMillisecondsTimeout))
return false; //return immediately if the timeout has expired.
}
finally
{
// Clean up: we're done waiting.
Waiters = Waiters - 1;
}
Monitor.Wait 對(duì)應(yīng)的實(shí)現(xiàn)代碼
[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj);
public static bool Wait(object obj, int millisecondsTimeout, bool exitContext)
{
if (obj == null)
throw (new ArgumentNullException(nameof(obj)));
return ObjWait(exitContext, millisecondsTimeout, obj);
}
最終調(diào)用的是一個(gè)本地庫(kù)的?ObjWait 方法。
查閱一下?Monitor.Wait 方法的幫助文檔:
Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.
Monitor.Wait 的確會(huì)阻塞當(dāng)前線程,這在異步高并發(fā)下會(huì)帶來(lái)問(wèn)題,詳見(jiàn)一碼阻塞,萬(wàn)碼等待:ASP.NET Core 同步方法調(diào)用異步方法“死鎖”的真相。
(二)
再看看?Semaphore 的實(shí)現(xiàn)代碼,它繼承自?WaitHandle?,?Semaphore.Wait 實(shí)際調(diào)用的是 WaitHandle.Wait ,后者調(diào)用的是?WaitOneNative?,這是一個(gè)本地庫(kù)的方法
[MethodImplAttribute(MethodImplOptions.InternalCall)]private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);
.net core 3.0 中有些變化,這里調(diào)用的是?WaitOneCore 方法
[MethodImpl(MethodImplOptions.InternalCall)]private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);
查閱一下 WaitHandle.Wait?方法的幫助文檔:?
Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.
WaitHandle.Wait 也會(huì)阻塞當(dāng)前線程。
2個(gè)地方在等待鎖時(shí)都會(huì)阻塞線程,難怪高并發(fā)下會(huì)出問(wèn)題。
(三)
接著閱讀?SemaphoreSlim 的源碼學(xué)習(xí)它是如何在?WaitAsync?中實(shí)現(xiàn)異步等待鎖的?
public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken){
//...
lock (m_lockObj!)
{
// If there are counts available, allow this waiter to succeed.
if (m_currentCount > 0)
{
--m_currentCount;
if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();
return s_trueTask;
}
else if (millisecondsTimeout == 0)
{
// No counts, if timeout is zero fail fast
return s_falseTask;
}
// If there aren't, create and return a task to the caller.
// The task will be completed either when they've successfully acquired
// the semaphore or when the timeout expired or cancellation was requested.
else
{
Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative");
var asyncWaiter = CreateAndAddAsyncWaiter();
return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?
asyncWaiter :
WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);
}
}
}
重點(diǎn)看 else 部分的代碼,SemaphoreSlim.WaitAsync 造了一個(gè)專(zhuān)門(mén)用于等待鎖的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用于創(chuàng)建?TaskNode 的實(shí)例
private TaskNode CreateAndAddAsyncWaiter(){
// Create the task
var task = new TaskNode();
// Add it to the linked list
if (m_asyncHead == null)
{
m_asyncHead = task;
m_asyncTail = task;
}
else
{
m_asyncTail.Next = task;
task.Prev = m_asyncTail;
m_asyncTail = task;
}
// Hand it back
return task;
}
從上面的代碼看到 TaskNode 用到了鏈表,神奇的等鎖專(zhuān)用 Task —— TaskNode 是如何實(shí)現(xiàn)的呢?
private sealed class TaskNode : Task<bool>{
internal TaskNode? Prev, Next;
internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { }
}
好簡(jiǎn)單!
那?SemaphoreSlim.WaitAsync 如何用?TaskNode 實(shí)現(xiàn)指定了超時(shí)時(shí)間的鎖等待?
看?WaitUntilCountOrTimeoutAsync 方法的實(shí)現(xiàn)源碼:
private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken){
// Wait until either the task is completed, timeout occurs, or cancellation is requested.
// We need to ensure that the Task.Delay task is appropriately cleaned up if the await
// completes due to the asyncWaiter completing, so we use our own token that we can explicitly
// cancel, and we chain the caller's supplied token into it.
using (var cts = cancellationToken.CanBeCanceled ?
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :
new CancellationTokenSource())
{
var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));
if (asyncWaiter == await waitCompleted.ConfigureAwait(false))
{
cts.Cancel(); // ensure that the Task.Delay task is cleaned up
return true; // successfully acquired
}
}
// If we get here, the wait has timed out or been canceled.
// If the await completed synchronously, we still hold the lock. If it didn't,
// we no longer hold the lock. As such, acquire it.
lock (m_lockObj)
{
// Remove the task from the list. If we're successful in doing so,
// we know that no one else has tried to complete this waiter yet,
// so we can safely cancel or timeout.
if (RemoveAsyncWaiter(asyncWaiter))
{
cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred
return false; // timeout occurred
}
}
// The waiter had already been removed, which means it's already completed or is about to
// complete, so let it, and don't return until it does.
return await asyncWaiter.ConfigureAwait(false);
}
用 Task.WhenAny 等待 TaskNode 與?Task.Delay ,等其中任一者先完成,簡(jiǎn)單到可怕。
又一次通過(guò) .net core 源碼欣賞了高手是怎么玩轉(zhuǎn) Task 的。
【2019-5-6更新】
今天將?Task.WhenAny +?Task.Delay 的招式用到了異步連接 Socket 的超時(shí)控制中
var connTask = _socket.ConnectAsync(_endpoint);if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask)
{
await connTask;
}
原文地址:https://www.cnblogs.com/dudu/p/10812139.html
.NET社區(qū)新聞,深度好文,歡迎訪問(wèn)公眾號(hào)文章匯總?http://www.csharpkit.com?
總結(jié)
以上是生活随笔為你收集整理的一次 .NET Core 中玩锁的经历:ManualResetEventSlim, SemaphoreSlim的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 事关SuperSocket发布,寻找Ya
- 下一篇: 通俗易懂,什么是.NET Core以及.