redis stream持久化_Beetlex.Redis之Stream功能详解
原標(biāo)題:Beetlex.Redis之Stream功能詳解
有一段時間沒有寫文章,techempower的測試規(guī)則評分竟然發(fā)生了變化,只能忘著補(bǔ)充一下占比權(quán)重最多的數(shù)據(jù)更新示例了和深入設(shè)計(jì)一下組件模塊化加載的設(shè)計(jì)。但在不久前有用戶問了一下組件是否支持redis的Stream功能,看了一樣相關(guān)資料后把功能實(shí)現(xiàn)之;接下來就介紹一下如何用Beetlex.Redis來調(diào)用redis的Stream功能。
什么是Stream
是Redis5.0的Stream是一個新的強(qiáng)大的支持多播的可持久化的消息隊(duì)列,它提供了消息添加,多組和多消費(fèi)者一致性讀取和ack確認(rèn)等功能;更詳細(xì)的介紹就不多說了可以通過網(wǎng)絡(luò)找到更多詳細(xì)描述。
創(chuàng)建Stream
組件通過RedisDB對象的GetStream訪求來創(chuàng)建一個Stream訪問對象,對象創(chuàng)建后就可以進(jìn)行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。創(chuàng)建代碼如下:
RedisStream < Employee>stream = DB.GetStream < Employee>("employees_stream");
XADD
在介紹這個操作前先說一下Stream里存儲的格式,默認(rèn)Stream消息是K-V的格式,從基礎(chǔ)指令上可以了解到這種結(jié)構(gòu)
XADDmystream* sensor-id1234 temperature19 .8
但這種格式操作起來并不友好,所以組件除了支持這種K-V的方式外,還支持以對象的方式進(jìn)行Stream消息處理。接下來看一下插入對象的調(diào)用
RedisStream stream = DB.GetStream( "employees_stream");
varid = awaitstream.Add(DataHelper.Defalut.Employees[ 0]);
id = awaitstream.Add(DataHelper.Defalut.Employees[ 1]);
id = awaitstream.Add(DataHelper.Defalut.Employees[ 2]);
varlen = awaitstream.Len;
組件支持直接入插對象,其基礎(chǔ)指令就是
XADDemployees_stream* dateemployeejson
組件直接采用一個K-V的方式來存儲對象,對于原則多個K-V的方式組件同樣也支持,只是在構(gòu)建Stream指定類型用Dictionary即可;接下其他就不多說了直接上指令用例了。
XLEN
RedisStream stream = DB.GetStream( "employees_stream");
varlen = awaitstream.Len;
XDEL
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Read( null, null, "0-0");
awaitstream.Del(( fromitem initems selectitem.ID).ToArray);
XRANGE
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Range;
items = awaitstream.RangeAll;
XREVRANGE
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.RevRange;
items = awaitstream.RevRangeAll;
XREAD
RedisStream stream = DB.GetStream( "employees_stream");
varitems = awaitstream.Read( 0, null, "0-0");
items = awaitstream.Read;
Stream的消費(fèi)組
前面介紹的指令感覺列表結(jié)構(gòu)都能滿足,其實(shí)Stream重要的功能是在組消費(fèi)這一塊,Redis可以針對Stream創(chuàng)建多個消費(fèi)組和消費(fèi)者,而消息會做一致性消費(fèi)處理。
XGROUP
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "henry");
XREAD
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "g1");
varitems = awaitgroup.Read( "henry", "0");
實(shí)際XRead提供了是否等待和起始讀已取參數(shù)
publicasyncValueTask>> ReadWait( stringconsumer, inttimeout= 0
publicValueTask>> Read( stringconsumer, stringstart = null)
publicasyncValueTask>> Read( stringconsumer, int? block, int? count, stringstart = null)
一般情況下可以通過readwait來不停地消息新的消息
while( true)
{
items = awaitgroup.ReadWait( "henry");
//處理消息
foreach( varitem initems)
{
awaititem.Ack;
}
}
XACK
RedisStream stream = DB.GetStream( "employees_stream");
vargroup= awaitstream.GetGroup( "g1");
varitems = awaitgroup.Read( "henry", "0");
foreach( varitem initems)
awaititem.Ack;
以上是BeetleX.Redis組件提供操作Stream的基礎(chǔ)指令,實(shí)際上Stream還有一些和運(yùn)維相關(guān)的指令,只是這些在實(shí)際業(yè)務(wù)上用不上所以就沒有去實(shí)現(xiàn)了。 返回搜狐,查看更多
責(zé)任編輯:
總結(jié)
以上是生活随笔為你收集整理的redis stream持久化_Beetlex.Redis之Stream功能详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cdn贝免费套餐_阿里云香港服务器带宽太
- 下一篇: 搜索重复代码_通过MappedByteB