基于C# Socket实现的简单的Redis客户端
前言
????Redis是一款強(qiáng)大的高性能鍵值存儲(chǔ)數(shù)據(jù)庫(kù),也是目前NOSQL中最流行比較流行的一款數(shù)據(jù)庫(kù),它在廣泛的應(yīng)用場(chǎng)景中扮演著至關(guān)重要的角色,包括但不限于緩存、消息隊(duì)列、會(huì)話存儲(chǔ)等。在本文中,我們將介紹如何基于C# Socket來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Redis客戶端類RedisClient,來(lái)演示構(gòu)建請(qǐng)求和輸出的相關(guān)通信機(jī)制。需要注意的是本文只是著重展示如何基于原生的Socket方式與Redis Server進(jìn)行通信,并不是構(gòu)建一個(gè)強(qiáng)大的Redis開發(fā)工具包。
Redis簡(jiǎn)介
????Redis(Remote Dictionary Server)是一個(gè)內(nèi)存數(shù)據(jù)庫(kù),它支持了非常豐富的數(shù)據(jù)結(jié)構(gòu),包括字符串、列表、集合、散列、有序集合等。Redis 提供了高性能的讀寫操作,可以用于緩存數(shù)據(jù)、消息隊(duì)列、分布式鎖、會(huì)話管理等多種用途。Redis 通常以鍵值對(duì)的方式存儲(chǔ)數(shù)據(jù),每個(gè)鍵都與一個(gè)值相關(guān)聯(lián),值的類型可以是字符串、列表、散列等。Redis不僅提供了豐富的命令集,用于操作存儲(chǔ)在數(shù)據(jù)庫(kù)中的數(shù)據(jù),還提供了Redis serialization protocol (RESP) 協(xié)議來(lái)解析Redis Server返回的數(shù)據(jù)。相關(guān)的文檔地址如下所示:
- Redis官網(wǎng)地址 https://redis.io/
- Redis官方文檔地址 https://redis.io/docs/
- Redis命令文檔地址 https://redis.io/commands/
- Redis序列化協(xié)議規(guī)范文檔地址 https://redis.io/docs/reference/protocol-spec/
Redis 命令指南
????Redis命令是與Redis服務(wù)器進(jìn)行通信的主要方式,通俗點(diǎn)就是發(fā)送指定格式的指令用于執(zhí)行各種操作,包括數(shù)據(jù)存儲(chǔ)、檢索、修改和刪除等。以下是一些日常使用過(guò)程中常見的Redis命令及其用途:
-
GET 和 SET 命令
-
GET key: 用于獲取指定鍵的值。 -
SET key value: 用于設(shè)置指定鍵的值.
-
-
DEL 命令
-
DEL key: 用于刪除指定鍵.
-
-
EXPIRE 和 TTL 命令
-
EXPIRE key seconds: 用于為指定鍵設(shè)置過(guò)期時(shí)間(秒). -
TTL key: 用于獲取指定鍵的剩余過(guò)期時(shí)間(秒).
注意這里的時(shí)間單位是秒
-
-
INCR 和 DECR 命令
-
INCR key: 用于遞增指定鍵的值. -
DECR key: 用于遞減指定鍵的值.
-
-
RPUSH 和 LPOP 命令
-
RPUSH key value: 用于將值添加到列表的右側(cè). -
LPOP key: 用于從列表的左側(cè)彈出一個(gè)值.
-
-
HSET 和 HGET 命令
-
HSET key field value: 用于設(shè)置哈希表中指定字段的值. -
HGET key field: 用于獲取哈希表中指定字段的值.
-
-
PUBLISH 和 SUBSCRIBE 命令
-
PUBLISH channel message: 用于向指定頻道發(fā)布消息. -
SUBSCRIBE channel: 用于訂閱指定頻道的消息.
-
當(dāng)然 Redis 支持的命令遠(yuǎn)不止這些,它還包括對(duì)集合、有序集合、位圖、HyperLogLog 等數(shù)據(jù)結(jié)構(gòu)的操作,以及事務(wù)、Lua 腳本執(zhí)行等高級(jí)功能。我們接下來(lái)演示的時(shí)候也只是展示幾個(gè)大家比較熟悉的指令,這也是我們學(xué)習(xí)新知識(shí)的時(shí)候經(jīng)常使用的方式,先從最簡(jiǎn)單最容易的開始入手,循序漸進(jìn),這也是微精通所提倡的方式。
Redis協(xié)議(RESP)
Redis Serialization Protocol (RESP) 是 Redis 使用的二進(jìn)制協(xié)議,用于客戶端和服務(wù)器之間的通信。我們可以通過(guò)該協(xié)議解析Redis服務(wù)器返回的命令格式,解析我們想要的數(shù)據(jù)。RESP具有簡(jiǎn)潔易解析的特點(diǎn)
-
簡(jiǎn)單字符串協(xié)議:
-
格式:
+OK\r\n - 第一個(gè)字節(jié)是"+”,后跟消息內(nèi)容,以"\r\n"(回車和換行)結(jié)束。
- 示例:
+OK\r\n
-
格式:
-
批量字符串協(xié)議:
-
格式:
$5\r\nhello\r\n - 第一個(gè)字節(jié)是"$",后跟字符串的字節(jié)長(zhǎng)度,然后是實(shí)際的字符串內(nèi)容,最后以"\r\n"結(jié)束。
- 示例:
$5\r\nhello\r\n
-
格式:
-
整數(shù)協(xié)議:
-
格式:
:42\r\n - 第一個(gè)字節(jié)是":",后跟整數(shù)的文本表示,以"\r\n"結(jié)束。
- 示例:
:42\r\n
-
格式:
-
數(shù)組協(xié)議:
-
格式:
*3\r\n:1\r\n:2\r\n:3\r\n - 第一個(gè)字節(jié)是"*",后跟數(shù)組中元素的數(shù)量,然后是數(shù)組中每個(gè)元素的 RESP 表示,以"\r\n"結(jié)束。
- 示例:
*3\r\n:1\r\n:2\r\n:3\r\n
-
格式:
-
錯(cuò)誤協(xié)議:
-
格式:
-Error message\r\n - 第一個(gè)字節(jié)是"-",后跟錯(cuò)誤消息內(nèi)容,以"\r\n"結(jié)束。
- 示例:
-Error message\r\n
-
格式:
需要注意的是字符串協(xié)議里面的長(zhǎng)度不是具體字符的長(zhǎng)度,而是對(duì)應(yīng)的
UTF8對(duì)應(yīng)的字節(jié)數(shù)組的長(zhǎng)度,這一點(diǎn)對(duì)于我們解析返回的數(shù)據(jù)很重要,否則獲取數(shù)據(jù)的時(shí)候會(huì)影響數(shù)據(jù)的完整性。
RESP協(xié)議是Redis高效性能的關(guān)鍵之一,它相對(duì)比較加單,不需要解析各種頭信息等,這使得Redis能夠在處理大規(guī)模數(shù)據(jù)和請(qǐng)求時(shí)表現(xiàn)出色。了解RESP協(xié)議可以幫助您更好地理解Redis客戶端類 RedisClient 的內(nèi)部工作原理。可以理解為它屬于一種應(yīng)用層面的協(xié)議,通過(guò)給定的數(shù)據(jù)格式解析出想要的數(shù)據(jù),這也對(duì)我們?cè)趯?shí)際編程過(guò)程中,解決類似的問(wèn)題,提供了一個(gè)不錯(cuò)的思路。
實(shí)現(xiàn)RedisClient
????上面我們介紹了一些關(guān)于Redis的基礎(chǔ)概念,重點(diǎn)介紹了一下關(guān)于Redis的命令和RESP,接下來(lái)我們就結(jié)合上面的理論,基于C# Socket來(lái)簡(jiǎn)單的模擬一下如何和Redis Server進(jìn)行數(shù)據(jù)交互。主要就是結(jié)合Redis命令和Redis 協(xié)議(RESP)來(lái)簡(jiǎn)單的實(shí)現(xiàn)。
通信架子
首先來(lái)看一下類的結(jié)構(gòu)
public class RedisClient : IDisposable, IAsyncDisposable
{
//定義默認(rèn)端口
private readonly int DefaultPort = 6379;
//定義默認(rèn)地址
private readonly string Host = "localhost";
//心跳間隔,單位為毫秒
private readonly int HeartbeatInterval = 30000;
private bool _isConnected;
//心跳定時(shí)器
private Timer _heartbeatTimer;
private Socket _socket;
public RedisClient(string host = "localhost", int defaultPort = 6379)
{
Host = host;
DefaultPort = defaultPort;
// 初始化心跳定時(shí)器
_heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
}
//連接方法
public async Task ConnectAsync(int timeoutMilliseconds = 5000)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var cts = new CancellationTokenSource(timeoutMilliseconds);
await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
_isConnected = true;
}
//心跳方法
private async void HeartbeatCallback(object state)
{
if (_isConnected)
{
var pingCommand = "PING\r\n";
await SendCommandAsync(pingCommand);
}
}
//釋放邏輯
public void Dispose()
{
DisposeAsync().GetAwaiter().GetResult();
}
public ValueTask DisposeAsync()
{
// 停止心跳定時(shí)器
_heartbeatTimer.Dispose();
if (_socket != null)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
return ValueTask.CompletedTask;
}
}
上面的類定義了實(shí)現(xiàn)的大致通信結(jié)構(gòu),結(jié)構(gòu)中主要涉及到的是通信相關(guān)的功能實(shí)現(xiàn),包含Socket的初始化信息、默認(rèn)的連連接信息、心跳方法、釋放邏輯等。首先,在構(gòu)造函數(shù)中,指定了默認(rèn)的Redis端口(6379)、地址(localhost),并初始化了心跳定時(shí)器。連接方法ConnectAsync通過(guò)Socket建立與Redis服務(wù)器的TCP連接。心跳定時(shí)器HeartbeatCallback定期發(fā)送PING命令,確保與服務(wù)器的連接保持活動(dòng)。最后,Dispose方法用于釋放資源,包括停止心跳定時(shí)器和關(guān)閉Socket連接,實(shí)現(xiàn)了IDisposable和IAsyncDisposable接口。這些功能為RedisClient類提供了基本的連接和資源管理能力。由于我對(duì)Socket編程也不是很熟悉,所以定義的可能不是很完善,有比較熟悉的同學(xué),可以多多指導(dǎo)。
發(fā)送和解析
有了這個(gè)基礎(chǔ)的架子之后,我們可以在里面填寫具體的實(shí)現(xiàn)邏輯了。首先我們來(lái)定義發(fā)送Redis命令和解析RESP的邏輯
//發(fā)送命令
public async Task<string> SendCommandAsync(string command)
{
// 發(fā)送命令的實(shí)現(xiàn)
if (!_isConnected)
{
// 如果連接已斷開,可以進(jìn)行重連
await ConnectAsync();
}
//Redis的命令是以\r\n為結(jié)尾的
var request = Encoding.UTF8.GetBytes(command + "\r\n");
//發(fā)送命令
await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
var response = new StringBuilder();
var remainingData = string.Empty;
//初始化響應(yīng)字符串和剩余數(shù)據(jù)
byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
while (true)
{
//讀取返回信息
var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
//將接收到的數(shù)據(jù)添加到響應(yīng)字符串
var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
//提取完整的響應(yīng)并添加到響應(yīng)字符串中
var completeResponses = ExtractCompleteResponses(ref responseData);
foreach (var completeResponse in completeResponses)
{
response.Append(completeResponse);
}
remainingData = responseData;
//結(jié)果為\r\n讀取結(jié)束
if (response.ToString().EndsWith("\r\n"))
{
break;
}
}
}
finally
{
//釋放緩沖區(qū)
ArrayPool<byte>.Shared.Return(receiveBuffer);
}
//返回完整的響應(yīng)字符串
return response.ToString();
}
private List<string> ExtractCompleteResponses(ref string data)
{
var completeResponses = new List<string>();
while (true)
{
var index = data.IndexOf("\r\n");
if (index >= 0)
{
// 提取一個(gè)完整的響應(yīng)
var completeResponse = data.Substring(0, index + 2);
//將完整的響應(yīng)添加到列表中
completeResponses.Add(completeResponse);
data = data.Substring(index + 2);
}
else
{
break;
}
}
return completeResponses;
}
private string ParseResponse(string response)
{
if (response.StartsWith("$"))
{
// 處理 Bulk Strings($)
var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(lengthStr, out int length))
{
if (length == -1)
{
return null!;
}
string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
return value;
}
}
else if (response.StartsWith("+"))
{
// 處理 Simple Strings(+)
return response.Substring(1, response.Length - 3);
}
else if (response.StartsWith(":"))
{
// 處理 Integers(:)
var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(valueStr, out int value))
{
return value.ToString();
}
}
// 如果響應(yīng)格式不符合預(yù)期,拋出異常
throw new InvalidOperationException(response);
}
上面邏輯涉及到發(fā)送和接收Redis消息的三個(gè)方法SendCommandAsync、ExtractCompleteResponses、ParseResponse。雖然上面代碼中有注釋,但是咱們分別I簡(jiǎn)單的講解一下這三個(gè)方法
-
SendCommandAsync
該方法主要目的是向 Redis 服務(wù)器發(fā)送命令并異步接收響應(yīng)
- 連接檢查:首先,檢查連接狀態(tài) (_isConnected),如果連接已斷開,則調(diào)用 ConnectAsync 方法進(jìn)行重連。
- 命令轉(zhuǎn)換:將傳入的命令字符串轉(zhuǎn)換為 UTF-8 編碼的字節(jié)數(shù)組,附加回車換行符 ("\r\n")。
- 接收響應(yīng):使用異步循環(huán)接收來(lái)自服務(wù)器的響應(yīng)。在每次接收之后,將接收到的數(shù)據(jù)添加到響應(yīng)字符串中,并提取其中的完整響應(yīng)。
- 緩沖區(qū)管理:為了有效地處理接收到的數(shù)據(jù),使用了一個(gè)緩沖區(qū) (receiveBuffer),并在方法結(jié)束時(shí)通過(guò) ArrayPool
.Shared.Return 進(jìn)行釋放。 - 提取完整響應(yīng):調(diào)用 ExtractCompleteResponses 方法,該方法從響應(yīng)數(shù)據(jù)中提取出一個(gè)或多個(gè)完整的響應(yīng),將其從數(shù)據(jù)中移除,并返回一個(gè)列表。
-
ExtractCompleteResponses
該方法主要用于從接收到的數(shù)據(jù)中提取出一個(gè)或多個(gè)完整的響應(yīng)。
- completeResponses 列表:用于存儲(chǔ)提取出的完整響應(yīng)的列表。
- while 循環(huán):循環(huán)進(jìn)行以下操作,直到數(shù)據(jù)中沒有換行符為止。
- 提取完整響應(yīng):如果找到換行符,就提取從數(shù)據(jù)開頭到換行符位置的子字符串,包括換行符本身,構(gòu)成一個(gè)完整的響應(yīng)。
- 添加到列表:將提取出的完整響應(yīng)添加到 completeResponses 列表中。
-
ParseResponse
該方法主要用于解析從 Redis 服務(wù)器接收到的響應(yīng)字符串。
- 如果響應(yīng)以 $ 開頭,表示這是一個(gè) Bulk String 類型的響應(yīng)。
- 如果響應(yīng)以 + 開頭,表示這是一個(gè) Simple String 類型的響應(yīng)。
- 如果響應(yīng)以 : 開頭,表示這是一個(gè) Integer 類型的響應(yīng)。
簡(jiǎn)單操作方法
上面有了和Redis通信的基本方法,也有了解析RESP協(xié)議的基礎(chǔ)方法,接下來(lái)咱們實(shí)現(xiàn)幾個(gè)簡(jiǎn)單的Redis操作指令來(lái)展示一下Redis客戶端具體是如何工作的,簡(jiǎn)單的幾個(gè)方法如下所示
//切換db操作
public async Task SelectAsync(int dbIndex)
{
var command = $"SELECT {dbIndex}";
await SendCommandAsync(command);
}
//get操作
public async Task<string> GetAsync(string key)
{
var command = $"GET {key}";
return ParseResponse(await SendCommandAsync(command));
}
//set操作
public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"SET {key} '{value}'";
//判斷會(huì)否追加過(guò)期時(shí)間
if (expiry.HasValue)
{
command += $" EX {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "OK";
}
//支持過(guò)期時(shí)間的setnx操作
public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
{
//因?yàn)槟J(rèn)的setnx方法不支持添加過(guò)期時(shí)間,為了保證操作的原子性,使用了lua
var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.HasValue)
{
command += $" {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "1";
}
//添加支持函過(guò)期時(shí)間的list push操作
public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
{
var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
if tonumber(ARGV[2]) > 0 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
var response = await ExecuteLuaScriptAsync(script, keys, args);
return long.Parse(response);
}
//list pop操作
public async Task<string> ListPopAsync(string key)
{
var command = $"LPOP {key}";
return ParseResponse(await SendCommandAsync(command));
}
//listrange操作
public async Task<List<string>> ListRangeAsync(string key, int start, int end)
{
var command = $"LRANGE {key} {start} {end}";
var response = await SendCommandAsync(command);
if (response.StartsWith("*0\r\n"))
{
return new List<string>();
}
//由于list range返回了是一個(gè)數(shù)組,所以單獨(dú)處理了一下,這里我使用了正則,解析字符串也可以,方法隨意
var values = new List<string>();
var pattern = @"\$\d+\r\n(.*?)\r\n";
MatchCollection matches = Regex.Matches(response, pattern);
foreach (Match match in matches)
{
values.Add(match.Groups[1].Value);
}
return values;
}
//執(zhí)行l(wèi)ua腳本的方法
public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
{
//去除lua里的換行
script = Regex.Replace(script, @"[\r\n]", "");
// 構(gòu)建EVAL命令,將Lua腳本、keys和args發(fā)送到Redis服務(wù)器
var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
//拼接key和value參數(shù)
if (keys != null && keys.Length != 0)
{
command += string.Join(" ", keys.Select(key => $"{key}"));
}
if (args != null && args.Length != 0)
{
command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
}
return ParseResponse(await SendCommandAsync(command));
}
//redis發(fā)布操作
public async Task SubscribeAsync(string channel, Action<string, string> handler)
{
await SendCommandAsync($"SUBSCRIBE {channel}");
while (true)
{
var response = await SendCommandAsync(string.Empty);
string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
Match match = Regex.Match(response, pattern);
if (match.Success)
{
string ch = match.Groups[2].Value;
string message = match.Groups[3].Value;
handler(ch, message);
}
}
}
//redis訂閱操作
public async Task PublishAsync(string channel, string message)
{
await SendCommandAsync($"PUBLISH {channel} {message}");
}
上面方法中演示了幾個(gè)比較常見的操作,很簡(jiǎn)單,主要是向大家展示Redis命令是如何發(fā)送的,從最簡(jiǎn)單的GET、SET、LIST、發(fā)布訂閱、執(zhí)行LUA操作方面著手,如果對(duì)Redis命令比較熟悉的話,操作起來(lái)還是比較簡(jiǎn)單的,這里給大家講解幾個(gè)比較有代表的方法
- 首先關(guān)于
setnx方法,由于自帶的setnx方法不支持添加過(guò)期時(shí)間,為了保證操作的原子性,使用了lua腳本的方式 - 自帶的
lpush也就是上面ListPushAsync方法中封裝的操作,自帶的也是沒辦法給定過(guò)期時(shí)間的,為了保證操作的原子性,我在這里也是用lua進(jìn)行封裝 - 關(guān)于執(zhí)行
lua腳本的時(shí)候的時(shí)候需要注意lua腳本的格式EVAL script numkeys [key [key ...]] [arg [arg ...]]腳本后面緊跟著的長(zhǎng)度是key的個(gè)數(shù)這個(gè)需要注意 - 最后,自行編寫命令的時(shí)候需要注意
\r\n的處理和引號(hào)的轉(zhuǎn)義問(wèn)題,當(dāng)然研究的越深,遇到的問(wèn)題越多
相信大家也看到了,這里我封裝的都是幾個(gè)簡(jiǎn)單的操作,難度系數(shù)不大,因?yàn)橹饕窍虼蠹已菔?code>Redis客戶端的發(fā)送和接收操作是什么樣的,甚至我都是直接返回的字符串,真實(shí)使用的時(shí)候我們使用都是需要封裝序列化和反序列化操作的。
完整代碼
上面分別對(duì)RedisClient類中的方法進(jìn)行了講解,接下來(lái)我把我封裝的類完整的給大家貼出來(lái),由于封裝的只是幾個(gè)簡(jiǎn)單的方法用于演示,所以也只有一個(gè)類,代碼量也不多,主要是為了方便大家理解,有想試驗(yàn)的同學(xué)可以直接拿走
public class RedisClient : IDisposable, IAsyncDisposable
{
private readonly int DefaultPort = 6379;
private readonly string Host = "localhost";
private readonly int HeartbeatInterval = 30000;
private bool _isConnected;
private Timer _heartbeatTimer;
private Socket _socket;
public RedisClient(string host = "localhost", int defaultPort = 6379)
{
Host = host;
DefaultPort = defaultPort;
_heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
}
public async Task ConnectAsync(int timeoutMilliseconds = 5000)
{
_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var cts = new CancellationTokenSource(timeoutMilliseconds);
await _socket.ConnectAsync(Host, DefaultPort, cts.Token);
_isConnected = true;
}
public async Task SelectAsync(int dbIndex)
{
var command = $"SELECT {dbIndex}";
await SendCommandAsync(command);
}
public async Task<string> GetAsync(string key)
{
var command = $"GET {key}";
return ParseResponse(await SendCommandAsync(command));
}
public async Task<bool> SetAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"SET {key} '{value}'";
if (expiry.HasValue)
{
command += $" EX {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "OK";
}
public async Task<bool> SetNxAsync(string key, string value, TimeSpan? expiry = null)
{
var command = $"EVAL \"if redis.call('SETNX', KEYS[1], ARGV[1]) == 1 then if ARGV[2] then redis.call('EXPIRE', KEYS[1], ARGV[2]) end return true else return false end\" 1 {key} '{value}'";
if (expiry.HasValue)
{
command += $" {expiry.Value.TotalSeconds}";
}
var response = ParseResponse(await SendCommandAsync(command));
return response == "1";
}
public async Task<long> ListPushAsync(string key, string value, TimeSpan? expiry = null)
{
var script = @"local len = redis.call('LPUSH', KEYS[1], ARGV[1])
if tonumber(ARGV[2]) > 0 then
redis.call('EXPIRE', KEYS[1], ARGV[2])
end
return len";
var keys = new string[] { key };
var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };
var response = await ExecuteLuaScriptAsync(script, keys, args);
return long.Parse(response);
}
public async Task<string> ListPopAsync(string key)
{
var command = $"LPOP {key}";
return ParseResponse(await SendCommandAsync(command));
}
public async Task<long> ListLengthAsync(string key)
{
var command = $"LLEN {key}";
return long.Parse(ParseResponse(await SendCommandAsync(command)));
}
public async Task<List<string>> ListRangeAsync(string key, int start, int end)
{
var command = $"LRANGE {key} {start} {end}";
var response = await SendCommandAsync(command);
if (response.StartsWith("*0\r\n"))
{
return new List<string>();
}
var values = new List<string>();
var pattern = @"\$\d+\r\n(.*?)\r\n";
MatchCollection matches = Regex.Matches(response, pattern);
foreach (Match match in matches)
{
values.Add(match.Groups[1].Value);
}
return values;
}
public async Task<string> ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
{
script = Regex.Replace(script, @"[\r\n]", "");
var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
if (keys != null && keys.Length != 0)
{
command += string.Join(" ", keys.Select(key => $"{key}"));
}
if (args != null && args.Length != 0)
{
command += " " + string.Join(" ", args.Select(arg => $"{arg}"));
}
return ParseResponse(await SendCommandAsync(command));
}
public async Task SubscribeAsync(string channel, Action<string, string> handler)
{
await SendCommandAsync($"SUBSCRIBE {channel}");
while (true)
{
var response = await SendCommandAsync(string.Empty);
string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
Match match = Regex.Match(response, pattern);
if (match.Success)
{
string ch = match.Groups[2].Value;
string message = match.Groups[3].Value;
handler(ch, message);
}
}
}
public async Task PublishAsync(string channel, string message)
{
await SendCommandAsync($"PUBLISH {channel} {message}");
}
public async Task<string> SendCommandAsync(string command)
{
if (!_isConnected)
{
await ConnectAsync();
}
var request = Encoding.UTF8.GetBytes(command + "\r\n");
await _socket.SendAsync(new ArraySegment<byte>(request), SocketFlags.None);
var response = new StringBuilder();
var remainingData = string.Empty;
byte[] receiveBuffer = ArrayPool<byte>.Shared.Rent(1024);
try
{
while (true)
{
var bytesRead = await _socket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer), SocketFlags.None);
var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
var completeResponses = ExtractCompleteResponses(ref responseData);
foreach (var completeResponse in completeResponses)
{
response.Append(completeResponse);
}
remainingData = responseData;
if (response.ToString().EndsWith("\r\n"))
{
break;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(receiveBuffer);
}
return response.ToString();
}
private List<string> ExtractCompleteResponses(ref string data)
{
var completeResponses = new List<string>();
while (true)
{
var index = data.IndexOf("\r\n");
if (index >= 0)
{
var completeResponse = data.Substring(0, index + 2);
completeResponses.Add(completeResponse);
data = data.Substring(index + 2);
}
else
{
break;
}
}
return completeResponses;
}
private string ParseResponse(string response)
{
if (response.StartsWith("$"))
{
var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(lengthStr, out int length))
{
if (length == -1)
{
return null!;
}
string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
return value;
}
}
else if (response.StartsWith("+"))
{
return response.Substring(1, response.Length - 3);
}
else if (response.StartsWith(":"))
{
var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
if (int.TryParse(valueStr, out int value))
{
return value.ToString();
}
}
throw new InvalidOperationException(response);
}
private async void HeartbeatCallback(object state)
{
if (_isConnected)
{
var pingCommand = "PING\r\n";
await SendCommandAsync(pingCommand);
}
}
public void Dispose()
{
DisposeAsync().GetAwaiter().GetResult();
}
public ValueTask DisposeAsync()
{
_heartbeatTimer.Dispose();
if (_socket != null)
{
_socket.Shutdown(SocketShutdown.Both);
_socket.Close();
}
return ValueTask.CompletedTask;
}
}
簡(jiǎn)單使用RedisClient
上面我們封裝了RedisClient類,也講解了里面實(shí)現(xiàn)的幾個(gè)簡(jiǎn)單的方法,接下來(lái)我們就簡(jiǎn)單的使用一下它,比較簡(jiǎn)單直接上代碼
GET/SET
GET/SET是最基礎(chǔ)和最簡(jiǎn)單的指令,沒啥可說(shuō)的直接上代碼
using RedisClient redisClient = new RedisClient();
await redisClient.ConnectAsync();
//切換db
await redisClient.SelectAsync(3);
bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好嗎?", TimeSpan.FromSeconds(120));
string getResult = await redisClient.GetAsync("key:foo");
Console.WriteLine("get key:foo:" + getResult);
SETNX
SETNX比較常用,很多時(shí)候用在做分布式鎖的場(chǎng)景,判斷資源存不存在的時(shí)候經(jīng)常使用
//第一次setnx返回true
bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("first setnx order:lock:" + setNxResult);
//第一次setnx返回false
setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("second setnx aname:foo:" + setNxResult);
PUB/SUB
這里實(shí)現(xiàn)的SubscribeAsync和PublishAsync需要使用兩個(gè)RedisClient實(shí)例,因?yàn)槲疑厦娣庋b的每個(gè)RedisClient只包含一個(gè)Socket實(shí)例所以ReceiveAsync方法是阻塞的。如果同一個(gè)實(shí)例的話SubscribeAsync的時(shí)候,在使用PublishAsync方法的時(shí)候會(huì)被阻塞,所以演示的時(shí)候使用了兩個(gè)RedisClient實(shí)例
_ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) => { Console.WriteLine($"接收消息:[{ch}]---[{msg}]"); });
Thread.Sleep(2000);
using RedisClient redisClient2 = new RedisClient();
await redisClient2.ConnectAsync();
for (int i = 0; i < 5; i++)
{
await redisClient2.PublishAsync("order_msg_ch", $"發(fā)送消息{i}");
Thread.Sleep(2000);
}
ExecuteLuaScriptAsync
動(dòng)態(tài)執(zhí)行l(wèi)ua的功能還是比較強(qiáng)大的,在之前的項(xiàng)目中,我也使用類似的功能。我們是模擬搶單/完成的場(chǎng)景,比如業(yè)務(wù)人員需要自行搶單,每個(gè)人最多搶幾單,超過(guò)閾值則搶單失敗,你需要把搶到的完成了才能繼續(xù)搶單,這種操作就需要借助lua進(jìn)行操作
//搶單的lua
string takeOrderLuaScript = @"
local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
if ordersTaken < tonumber(ARGV[1]) then
redis.call('INCR', KEYS[1])
return 1
else
return 0
end";
//完成你手里的訂單操作
string completeOrderLuaScript = @"
local ordersTaken = tonumber(redis.call('GET', KEYS[1]) or '0')
if ordersTaken > 0 then
redis.call('DECR', KEYS[1])
return 1
else
return 0
end";
//模擬搶單,最多搶兩單
string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
//完成訂單
string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
還有一個(gè)功能也是我們之前遇到的,就是使用Redis實(shí)現(xiàn)緩存最新的N條消息,舊的則被拋棄,實(shí)現(xiàn)這個(gè)功能也需要使用Redis的List結(jié)構(gòu)結(jié)合lua的方式
string luaScript = @"
local record_key = KEYS[1]
local max_records = tonumber(ARGV[1])
local new_record = ARGV[2]
local current_count = redis.call('LLEN', record_key)
if current_count >= max_records then
redis.call('LPOP', record_key)
end
redis.call('RPUSH', record_key, new_record)
";
//這里限制保存最新的50條數(shù)據(jù),舊的數(shù)據(jù)則被拋棄
for (int i = 0; i < 60; i++)
{
_ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() });
}
List
LIST很多時(shí)候會(huì)把它當(dāng)做分布式隊(duì)列來(lái)使用,它提供的操作也比較靈活,咱們這里只是封裝了幾個(gè)最簡(jiǎn)單的操作,大致的效果如下所示
//lis入隊(duì)操作
var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1));
//list出隊(duì)操作
var str = await redisClient.ListPopAsync("list:2");
//list長(zhǎng)度
var length = await redisClient.ListLengthAsync("list:2");
//list range操作
var list = await redisClient.ListRangeAsync("article:list", 0, 10);
總結(jié)
????本文我們通過(guò)理解Redis命令和RESP協(xié)議來(lái)構(gòu)建了一個(gè)簡(jiǎn)單RedisClient的實(shí)現(xiàn),方便我們更容易的理解Redis客戶端如何與Redis服務(wù)器進(jìn)行通信,這個(gè)實(shí)現(xiàn)也可以作為學(xué)習(xí)和理解·Redis客戶端·的一個(gè)很好的例子。當(dāng)然我們的這個(gè)RedisClient這是了解和學(xué)習(xí)使用,很多場(chǎng)景我們并沒有展示,實(shí)際的項(xiàng)目我們還是盡量使用開源的Redis SDK, .net中常用的有StackExchange.Redis、FreeRedis、csredis、NewLife.Redis、Service.Stack.Redis,其中我經(jīng)常使用的是StackExchange.Redis和FreeRedis整體來(lái)說(shuō)效果還是不錯(cuò)的。總結(jié)一下我們文章的主要內(nèi)容
- 首先我們講解了
Redis命令的格式 - 其次我們講解了
Redis協(xié)議(RESP)的主要格式以及如何解析 - 然后我們基于上面的理論簡(jiǎn)單的封裝了一個(gè)
RedisClient類來(lái)演示相關(guān)概念 - 最后我們通過(guò)幾個(gè)示例和我用過(guò)的兩個(gè)
lua來(lái)簡(jiǎn)單的演示RedisClient類的使用
????作為新時(shí)代的職場(chǎng)人,我樂(lè)在探究自己感興趣的領(lǐng)域,對(duì)未知的事物充滿好奇,并渴望深入了解。對(duì)于常用的核心技術(shù),我不僅要求自己能夠熟練運(yùn)用,更追求深入理解其實(shí)現(xiàn)原理。面對(duì)新的技術(shù)趨勢(shì),我決不會(huì)視而不見,而是在熟悉更多常用技術(shù)棧的同時(shí),努力深入掌握一些重要的知識(shí)。我堅(jiān)信,學(xué)無(wú)止境,每一步的進(jìn)步都帶來(lái)無(wú)比的喜悅與成就感。
總結(jié)
以上是生活随笔為你收集整理的基于C# Socket实现的简单的Redis客户端的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Service Mesh:微服务架构的救
- 下一篇: 如何通过 wireshark 捕获 C#