日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

redis stream持久化_Beetlex.Redis之Stream功能详解

發(fā)布時間:2025/4/17 数据库 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis stream持久化_Beetlex.Redis之Stream功能详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原標(biāo)題:Beetlex.Redis之Stream功能詳解

有一段時間沒有寫文章,techempower的測試規(guī)則評分竟然發(fā)生了變化,只能忘著補(bǔ)充一下占比權(quán)重最多的數(shù)據(jù)更新示例了和深入設(shè)計(jì)一下組件模塊化加載的設(shè)計(jì)。但在不久前有用戶問了一下組件是否支持redis的Stream功能,看了一樣相關(guān)資料后把功能實(shí)現(xiàn)之;接下來就介紹一下如何用Beetlex.Redis來調(diào)用redis的Stream功能。

什么是Stream

是Redis5.0的Stream是一個新的強(qiáng)大的支持多播的可持久化的消息隊(duì)列,它提供了消息添加,多組和多消費(fèi)者一致性讀取和ack確認(rèn)等功能;更詳細(xì)的介紹就不多說了可以通過網(wǎng)絡(luò)找到更多詳細(xì)描述。

創(chuàng)建Stream

組件通過RedisDB對象的GetStream訪求來創(chuàng)建一個Stream訪問對象,對象創(chuàng)建后就可以進(jìn)行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。創(chuàng)建代碼如下:

RedisStream < Employee>stream = DB.GetStream < Employee>("employees_stream");

XADD

在介紹這個操作前先說一下Stream里存儲的格式,默認(rèn)Stream消息是K-V的格式,從基礎(chǔ)指令上可以了解到這種結(jié)構(gòu)

XADDmystream* sensor-id1234 temperature19 .8

但這種格式操作起來并不友好,所以組件除了支持這種K-V的方式外,還支持以對象的方式進(jìn)行Stream消息處理。接下來看一下插入對象的調(diào)用

RedisStream stream = DB.GetStream( "employees_stream");

varid = awaitstream.Add(DataHelper.Defalut.Employees[ 0]);

id = awaitstream.Add(DataHelper.Defalut.Employees[ 1]);

id = awaitstream.Add(DataHelper.Defalut.Employees[ 2]);

varlen = awaitstream.Len;

組件支持直接入插對象,其基礎(chǔ)指令就是

XADDemployees_stream* dateemployeejson

組件直接采用一個K-V的方式來存儲對象,對于原則多個K-V的方式組件同樣也支持,只是在構(gòu)建Stream指定類型用Dictionary即可;接下其他就不多說了直接上指令用例了。

XLEN

RedisStream stream = DB.GetStream( "employees_stream");

varlen = awaitstream.Len;

XDEL

RedisStream stream = DB.GetStream( "employees_stream");

varitems = awaitstream.Read( null, null, "0-0");

awaitstream.Del(( fromitem initems selectitem.ID).ToArray);

XRANGE

RedisStream stream = DB.GetStream( "employees_stream");

varitems = awaitstream.Range;

items = awaitstream.RangeAll;

XREVRANGE

RedisStream stream = DB.GetStream( "employees_stream");

varitems = awaitstream.RevRange;

items = awaitstream.RevRangeAll;

XREAD

RedisStream stream = DB.GetStream( "employees_stream");

varitems = awaitstream.Read( 0, null, "0-0");

items = awaitstream.Read;

Stream的消費(fèi)組

前面介紹的指令感覺列表結(jié)構(gòu)都能滿足,其實(shí)Stream重要的功能是在組消費(fèi)這一塊,Redis可以針對Stream創(chuàng)建多個消費(fèi)組和消費(fèi)者,而消息會做一致性消費(fèi)處理。

XGROUP

RedisStream stream = DB.GetStream( "employees_stream");

vargroup= awaitstream.GetGroup( "henry");

XREAD

RedisStream stream = DB.GetStream( "employees_stream");

vargroup= awaitstream.GetGroup( "g1");

varitems = awaitgroup.Read( "henry", "0");

實(shí)際XRead提供了是否等待和起始讀已取參數(shù)

publicasyncValueTask>> ReadWait( stringconsumer, inttimeout= 0

publicValueTask>> Read( stringconsumer, stringstart = null)

publicasyncValueTask>> Read( stringconsumer, int? block, int? count, stringstart = null)

一般情況下可以通過readwait來不停地消息新的消息

while( true)

{

items = awaitgroup.ReadWait( "henry");

//處理消息

foreach( varitem initems)

{

awaititem.Ack;

}

}

XACK

RedisStream stream = DB.GetStream( "employees_stream");

vargroup= awaitstream.GetGroup( "g1");

varitems = awaitgroup.Read( "henry", "0");

foreach( varitem initems)

awaititem.Ack;

以上是BeetleX.Redis組件提供操作Stream的基礎(chǔ)指令,實(shí)際上Stream還有一些和運(yùn)維相關(guān)的指令,只是這些在實(shí)際業(yè)務(wù)上用不上所以就沒有去實(shí)現(xiàn)了。 返回搜狐,查看更多

責(zé)任編輯:

總結(jié)

以上是生活随笔為你收集整理的redis stream持久化_Beetlex.Redis之Stream功能详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。