日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【NServiceBus】什么是Saga,Saga能做什么

發(fā)布時間:2023/12/4 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【NServiceBus】什么是Saga,Saga能做什么 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

? ? ? ? ? Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經(jīng)歷和英雄業(yè)績的長篇故事,對,這里強調(diào)長篇故事。許多系統(tǒng)都存在長時間運行的業(yè)務流程,NServiceBus使用基于事件驅(qū)動的體系結(jié)構(gòu)將容錯性和可伸縮性融入這些業(yè)務處理過程中。
? ? ? ? ? 當然一個單一接口調(diào)用則算不上一個長時間運行的業(yè)務場景,那么如果在給定的用例中有兩個或多個調(diào)用,則應該考慮數(shù)據(jù)一致性的問題,這里有可能第一個接口調(diào)用成功,第二次調(diào)用則可能失敗或者超時,Saga的設計以簡單而健壯的方式處理這樣的業(yè)務用例。

認識Saga

? ? ? ? ?先來通過一段代碼簡單認識一下Saga,在NServiceBus里,使用Saga的話則需要實現(xiàn)抽象類Saga,SqlSaga,這里的T的是Saga業(yè)務實體,封裝數(shù)據(jù),用來在長時間運行過程中封裝業(yè)務數(shù)據(jù)。

public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
return Task.CompletedTask;
}

public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}
}

臨時狀態(tài)

? ? ?長時間運行則意味著有狀態(tài),任何涉及多個網(wǎng)絡調(diào)用的進程都需要一個臨時狀態(tài),這個臨時狀態(tài)可以存儲在內(nèi)存中,序列化在磁盤中,也可以存儲在分布式緩存中。在NServiceBus中我們定義實體,繼承抽象類ContainSagaData即可,默認情況下,所有公開訪問的屬性都會被持久化。

public class State:ContainSagaData
{
public Guid OrderId { get; set; }
}

添加行為

? ? ? 在NServiceBus里,處理消息的有兩種接口:IHandlerMessages、IAmStartedByMessages。

開啟一個Saga

? ? ? ?在前面的代碼片段里,我們看到已經(jīng)實現(xiàn)了接口IAmStartedByMessages,這個接口告訴NServiceBus,如果收到了StartOrder 消息,則創(chuàng)建一個Saga實例(Saga Instance),當然Saga長流程處理的實體至少有一個需要開啟Saga流程。

處理無序消息

? ? ? ?如果你的業(yè)務用例中確實存在無序消息的情況,則還需要業(yè)務流程正常輪轉(zhuǎn),那么則需要多個messaeg都要事先接口IAmStartedByMessages接口,也就是說多個message都可以創(chuàng)建Saga實例。

依賴可恢復性

? ? ? 在處理無序消息和多個消息類型的時候,就存在消息丟失的可能,必須在你的Saga狀態(tài)完成以后,這個Saga實例又收到一條消息,但這時Saga狀態(tài)已經(jīng)是完結(jié)狀態(tài),這條消息則仍然需要處理,這里則實現(xiàn)NServiceBus的IHandleSagaNotFound接口。

public class SagaNotFoundHandler:IHandleSagaNotFound
{
public Task Handle(object message, IMessageProcessingContext context)
{
return context.Reply(new SagaNotFoundMessage());
}
}

public class SagaNotFoundMessage
{

}

結(jié)束Saga

? ? ? 當你的業(yè)務用例不再需要Saga實例時,則調(diào)用MarkComplete()來結(jié)束Saga實例。這個方法在前面的代碼片段中也可以看到,其實本質(zhì)也就是設置Saga.Complete屬性,這是個bool值,你在業(yè)務用例中也可以用此值來判斷Saga流程是否結(jié)束。

namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using Extensibility;

public abstract class Saga
{
/// <summary>
/// The saga's typed data.
/// </summary>
public IContainSagaData Entity { get; set; }


public bool Completed { get; private set; }

internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
{
return RequestTimeout(context, at, new TTimeoutMessageType());
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
{
if (at.Kind == DateTimeKind.Unspecified)
{
throw new InvalidOperationException("Kind property of DateTime 'at' must be specified.");
}

VerifySagaCanHandleTimeout(timeoutMessage);

var options = new SendOptions();

options.DoNotDeliverBefore(at);
options.RouteToThisEndpoint();

SetTimeoutHeaders(options);

return context.Send(timeoutMessage, options);
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
{
return RequestTimeout(context, within, new TTimeoutMessageType());
}


protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);

var sendOptions = new SendOptions();

sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();

SetTimeoutHeaders(sendOptions);

return context.Send(timeoutMessage, sendOptions);
}


protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
{
if (string.IsNullOrEmpty(Entity.Originator))
{
throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
}

var options = new ReplyOptions();

options.SetDestination(Entity.Originator);
context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });


options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
{
SagaTypeToUse = null,
SagaIdToUse = null
});

return context.Reply(message, options);
}

//這個方法結(jié)束saga流程,標記Completed屬性
protected void MarkAsComplete()
{
Completed = true;
}

void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
{
var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
if (!canHandleTimeoutMessage)
{
var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
throw new Exception(message);
}
}

void SetTimeoutHeaders(ExtendableOptions options)
{
options.SetHeader(Headers.SagaId, Entity.Id.ToString());
options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
}
}
}

? ??

Saga持久化

? ? ? 本機開發(fā)環(huán)境我們使用LearningPersistence,但是投產(chǎn)的話則需要使用數(shù)據(jù)庫持久化,這里我們基于MySQL,SQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence會生成幾種關(guān)系型數(shù)據(jù)庫的sql scripts,然后會根據(jù)你的斷言配置選擇所需數(shù)據(jù)庫,比如SQL Server、MySQL、PostgreSQL、Oracle。
? ? ?持久化Saga自動創(chuàng)建所需表結(jié)構(gòu),你只需手動配置即可,配置后編譯成功后項目執(zhí)行目錄下會生成sql腳本,文件夾名稱是NServiceBus.Persistence.Sql,下面會有Saga子目錄。


/* TableNameVariable */

set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'Saga');


/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
'ERROR'
set
message_text = message,
mysql_errno = '45000';
end;

/* CreateTable */

set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
Id varchar(38) not null,
Metadata json not null,
Data json not null,
PersistenceVersion varchar(23) not null,
SagaTypeVersion varchar(23) not null,
Concurrency int not null,
primary key (Id)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
column_name = 'Correlation_OrderId' and
table_name = @tableNameNonQuoted;

set @query = IF(
@exist <= 0,
concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set @column_type_OrderId = (
select concat(column_type,' character set ', character_set_name)
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name = 'Correlation_OrderId'
);

set @query = IF(
@column_type_OrderId <> 'varchar(38) character set ascii',
'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderId */

select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Correlation_OrderId' and
table_name = @tableNameNonQuoted;

set @query = IF(
@exist <= 0,
concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
table_schema = database() and
table_name = @tableNameNonQuoted and
index_name like 'Index_Correlation_%' and
index_name <> 'Index_Correlation_OrderId' and
table_schema = database()
into @dropIndexQuery;
select if (
@dropIndexQuery is not null,
@dropIndexQuery,
'select ''no index to delete'';')
into @dropIndexQuery;

prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name like 'Correlation_%' and
column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;

select if (
@dropPropertiesQuery is not null,
@dropPropertiesQuery,
'select ''no property to delete'';')
into @dropPropertiesQuery;

prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;

/* CompleteSagaScript */

生成的表結(jié)構(gòu):

持久化配置

? ? ? Saga持久化需要依賴NServiceBus.Persistence.Sql。引入后需要實現(xiàn)SqlSaga抽象類,抽象類需要重寫ConfigureMapping,配置Saga工作流程業(yè)務主鍵。

public class Saga:SqlSaga<State>,
IAmStartedByMessages<StartOrder>
{
protected override void ConfigureMapping(IMessagePropertyMapper mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
}

protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Console.WriteLine($"Receive message with OrderId:{message.OrderId}");

MarkAsComplete();
return Task.CompletedTask;
}
}

static async Task MainAsync()
{
Console.Title = "Client-UI";

var configuration = new EndpointConfiguration("Client-UI");
//這個方法開啟自動建表、自動創(chuàng)建RabbitMQ隊列
configuration.EnableInstallers();
configuration.UseSerialization<NewtonsoftSerializer>();
configuration.UseTransport<LearningTransport>();

string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306;AllowUserVariables=True;AutoEnlist=false";
var persistence = configuration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();
//配置mysql連接串
persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));

var instance = await Endpoint.Start(configuration).ConfigureAwait(false);

var command = new StartOrder()
{
OrderId = Guid.NewGuid()
};

await instance.SendLocal(command).ConfigureAwait(false);

Console.ReadKey();

await instance.Stop().ConfigureAwait(false);
}

? ? ?

Saga Timeouts

? ? ?在消息驅(qū)動類型的環(huán)境中,雖然傳遞的無連接特性可以防止在線等待過程中消耗資源,但是畢竟等待時間需要有一個上線。在NServiceBus里已經(jīng)提供了Timeout方法,我們只需訂閱即可,可以在你的Handle方法中根據(jù)需要訂閱Timeout,可參考如下代碼:

public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>,
IHandleTimeouts<TimeOutMessage>
{

public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var model=new TimeOutMessage();

//訂閱超時消息
return RequestTimeout(context,TimeSpan.FromMinutes(10));
}

public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}

protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);


public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
{
//處理超時消息
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}
}//從Timeout的源碼看,這個方法是通過設置SendOptions,然后再把當前這個消息發(fā)送給自己來實現(xiàn)
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();
SetTimeoutHeaders(sendOptions);

return context.Send(timeoutMessage, sendOptions);
}

總結(jié)

? ? ? ?NServiceBus因為是商業(yè)產(chǎn)品,對分布式消息系統(tǒng)所涉及到的東西都做了實現(xiàn),包括分布式事務(Outbox)、DTC都有,還有心跳檢測,監(jiān)控都有,全而大,目前我們用到的也只是NServiceBus里很小的一部分功能。

總結(jié)

以上是生活随笔為你收集整理的【NServiceBus】什么是Saga,Saga能做什么的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 97成人在线观看 | 天天干天天弄 | 日韩精品一区二区三区国语自制 | 国产欧美日韩另类 | 99久久精品免费看国产四区 | 久久激情影院 | 亚洲欧美中日韩 | 久久久888 | 91一区二区国产 | 久久av片| 国产精选一区二区 | 中文字幕人成乱码熟女香港 | 国产h在线 | 97影院| 青青草综合 | 在线播放成人av | 黄色av免费网站 | 超碰人人草人人干 | 日本成人社区 | 末路1997全集免费观看完整版 | 三级做爰在线观看视频 | 日韩精品在线免费 | 成年男女免费视频网站 | 男男车车的车车网站w98免费 | 在线观看国产免费视频 | 精品久久久久久亚洲 | 日韩精品人妻无码一本 | 精品人妻一区二区三区蜜桃 | 不卡视频在线观看免费 | 久久日韩| 国产精品亚洲一区二区三区在线观看 | 欧美乱妇狂野欧美视频 | 伊人青草 | jizz性欧美15| mm1313亚洲国产精品美女 | 红桃视频91 | 欧美精品久久久久久久多人混战 | 欧美日韩一区二区三区在线电影 | 国产一区二区三区精品视频 | 污污视频在线免费观看 | 91日韩欧美 | 欧美三级日本三级 | 日本免费在线一区 | 亚洲免费av网站 | 色乱码一区二区三区在线男奴 | 久草视频免费播放 | 中文字幕欧美视频 | 老湿机69福利区午夜x片 | www.-级毛片线天内射视视 | 欧美在线一区二区 | 国产精品mm| 国产女同91疯狂高潮互磨 | 免费av福利| 国产一区二区三区视频网站 | 巨胸大乳www视频免费观看 | 欧美极度另类 | 国产亚洲系列 | 色播五月综合 | www.欧美 | 国产女主播一区二区 | 春意影院福利社 | 久久人妖 | 国产艳情片 | 国产一区二区在线视频观看 | 波多野结衣乳巨码无在线 | 午夜三级在线观看 | 天堂资源在线观看 | 日韩国产在线一区 | 国产三级国产精品 | 欧美成人aa | 在线观看亚洲精品 | 欧美首页| 一级黄色片在线看 | 日本东京热一区二区 | jizz在线看 | 亚洲国产精品18久久久久久 | 成人国产精品一区二区 | 国产精品毛片一区视频播 | 高清av在线 | 日本xxxxxxxxx69| 九九激情视频 | 美女伦理水蜜桃4 | 亚洲一区二区影视 | 欧美成人精品一区二区男人小说 | 国产精品第56页 | 精品久久一区二区三区 | 一级黄色免费毛片 | av高潮| 最近更新中文字幕 | 成人一区二区三区在线 | 国产精品高潮呻吟久久aⅴ码 | 亚洲国产精品毛片av不卡在线 | 欧美videos另类极品 | 国产精品污污 | 吻胸摸激情床激烈视频大胸 | 久久三 | 日韩一级视频 | 精品麻豆 | 亚洲精品偷拍 |