Orleans解决并发之痛(四):Streams
Orleans 提供了 Stream擴(kuò)展編程模型。此模型提供了一套API,使處理流更簡(jiǎn)單和更健壯。Stream默認(rèn)提供了兩種Provider,不同的流類型可能使用不同的Provider來(lái)處理,Simple Message Stream Provider 和 Azure Queue Stream Provider。Stream Providers兼容現(xiàn)有的隊(duì)列技術(shù),比如: Event Hubs、ServiceBus、Azure Queues、Apache Kafka,不再需要編寫額外的代碼來(lái)配合這些隊(duì)列技術(shù)的使用。
關(guān)于為什么Orleans會(huì)提供Stream擴(kuò)展編程模型?
當(dāng)今已經(jīng)有一系列技術(shù)可以來(lái)構(gòu)建一個(gè)流處理系統(tǒng)。包括持久存儲(chǔ)流數(shù)據(jù)方面,如:Event Hubs、Kafka;數(shù)據(jù)流計(jì)算操作方面,如: Azure Stream Analytics、Apache Storm、Apache Spark Streaming, 而這些技術(shù)并不適合細(xì)粒度的自由格式的流數(shù)據(jù)計(jì)算, 或者支持的并不好,因?yàn)閷?shí)際情況下可能需要對(duì)不同的數(shù)據(jù)流執(zhí)行不同的操作,Orleans Streams目的就是解決這類問(wèn)題,Stream編程模型和發(fā)布訂閱模式挺相似。
上述提到的一些技術(shù)我并沒(méi)有詳細(xì)學(xué)習(xí),后面會(huì)了解并對(duì)比,如果已熟悉的可以先思考并給我普及普及。
Orleans Stream大概實(shí)現(xiàn)的步驟如下:
獲取 StreamProvider
獲取 IAsyncStream<T>
訂閱者訂閱一個(gè)Stream
發(fā)布者向某個(gè)Stream發(fā)布消息
Silo配置文件OrleansConfiguration.xml修改
在Globals節(jié)點(diǎn)中添加:
<StorageProviders><Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" /> </StorageProviders> <StreamProviders><Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/> </StreamProviders>Name為PubSubStore的StorageProvider是必須的,Stream內(nèi)部需要它來(lái)跟蹤所有流訂閱,記錄各個(gè)流的發(fā)布者和訂閱者的關(guān)系,本例中使用MemoryStorage,實(shí)際生產(chǎn)環(huán)境這是不對(duì)的。
Name為SMSProvider的StreamProvider指定了消息的發(fā)布形式,Orleans當(dāng)前提供的兩種StreamProvider:Simple Message Stream Provider 和 Azure Queue Stream Provider 都是可靠的。
Simple Message Stream Provider:不保證可靠的交付,失敗的消息不會(huì)自動(dòng)重新發(fā)送,但可以根據(jù)返回的Task狀態(tài)來(lái)判斷是否重新發(fā)送,事件執(zhí)行順序遵循FIFO原則。
Azure Queue Stream Provider:事件被加入Azure Queue, 如果傳送或處理失敗,事件不會(huì)從隊(duì)列中刪除,并且稍后會(huì)自動(dòng)重新被發(fā)送,因此事件執(zhí)行順序不遵循FIFO原則。
獲取 StreamProvider
var streamProvider = this.GetStreamProvider("SMSProvider");SMSProvider 對(duì)應(yīng)配置文件中Name為SMSProvider的StreamProvider
獲取 IAsyncStream<T>
var streamId = this.GetPrimaryKey(); var stream = streamProvider.GetStream<string>(streamId, "GrainStream");GetStream 需要兩個(gè)參數(shù),通過(guò)兩個(gè)值定位唯一的Stream:
streamId:Guid類型,stream標(biāo)識(shí)
streamNamespace:字符串,stream的命名空間
訂閱一個(gè)Stream
訂閱Stream分為隱式和顯式訂閱。
隱式訂閱
隱式訂閱的訂閱者是唯一的,不存在對(duì)一個(gè)Stream的多次訂閱,也不能取消訂閱。
Interface:
public interface IImplicitSubscriberGrain : IGrainWithGuidKey { }Grain:
[ImplicitStreamSubscription("GrainImplicitStream")] public class ImplicitSubscriberGrain : Grain, IImplicitSubscriberGrain, IAsyncObserver<string> {protected StreamSubscriptionHandle<string> streamHandle;public override async Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");var stream = streamProvider.GetStream<string>(streamId, "GrainImplicitStream");streamHandle = await stream.SubscribeAsync(OnNextAsync);}public override async Task OnDeactivateAsync(){if (streamHandle != null)await streamHandle.UnsubscribeAsync();}public Task OnCompletedAsync(){return Task.CompletedTask;}public Task OnErrorAsync(Exception ex){return Task.CompletedTask;}public Task OnNextAsync(string item, StreamSequenceToken token = null){Console.WriteLine($"Received message:{item}");return Task.CompletedTask;} }在Grain上標(biāo)記 ImplicitStreamSubscription 屬性,變量值為命名空間;
在Grain的OnActivateAsync方法體中調(diào)用SubscribeAsync;
實(shí)現(xiàn)IAsyncObserver接口,當(dāng)發(fā)布者向Stream發(fā)送消息,訂閱者接到消息后將執(zhí)行OnNextAsync;
隱式訂閱模式訂閱者自動(dòng)由發(fā)布者創(chuàng)建;
顯式訂閱
Interface:
public interface IExplicitSubscriberGrain : IGrainWithGuidKey {Task<StreamSubscriptionHandle<string>> SubscribeAsync();Task ReceivedMessageAsync(string data); }Grain:
public class ExplicitSubscriberGrain : Grain, IExplicitSubscriberGrain {private IAsyncStream<string> stream;public async override Task OnActivateAsync(){var streamProvider = this.GetStreamProvider("SMSProvider");stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "GrainExplicitStream");var subscriptionHandles = await stream.GetAllSubscriptionHandles();if (subscriptionHandles.Count > 0){subscriptionHandles.ToList().ForEach(async x =>{await x.ResumeAsync((payload, token) => this.ReceivedMessageAsync(payload));});}}public async Task<StreamSubscriptionHandle<string>> SubscribeAsync(){return await stream.SubscribeAsync((payload, token) => this.ReceivedMessageAsync(payload));}public Task ReceivedMessageAsync(string data){Console.WriteLine($"Received message:{data}");return Task.CompletedTask;} }訂閱者通過(guò)調(diào)用SubscribeAsync方法完成訂閱,并返回StreamSubscriptionHandle,這個(gè)對(duì)象提供了UnsubscribeAsync方法,方便取消訂閱;
本例子中支持對(duì)同一個(gè)Stream被訂閱多次,被訂閱多次的結(jié)果是當(dāng)向這個(gè)Stream發(fā)送消息的時(shí)候,ReceivedMessageAsync會(huì)執(zhí)行多次。如果不希望對(duì)同一個(gè)Stream定義多次,在SubscribeAsync方法中可以通過(guò)GetAllSubscriptionHandles獲取當(dāng)前訂閱者的個(gè)數(shù),只有為0才執(zhí)行訂閱;
訂閱者是一直存在的,除了被顯示調(diào)用了UnsubscribeAsync方法。在OnActivateAsync中我們加入了ResumeAsync操作, 當(dāng)Grain由未激活狀態(tài)變?yōu)榧せ顮顟B(tài)的時(shí)候,通過(guò)GetAllSubscriptionHandles獲取這個(gè)Stream中存在的訂閱者,通過(guò)ResumeAsync可以把它們重新喚醒。(模擬方式:殺掉Silo,重新啟動(dòng)即可,不過(guò)前提條件是PubSubStore不能使用MemoryStorage,因?yàn)槭褂肕emoryStorage存儲(chǔ)一旦重啟后訂閱者和發(fā)布者的關(guān)系都會(huì)丟失)
發(fā)布消息
Interface:
public interface IPublisherGrain: IGrainWithGuidKey {Task PublishMessageAsync(string data); }Grain:
public class PublisherGrain : Grain, IPublisherGrain {private IAsyncStream<string> stream;public override Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");this.stream = streamProvider.GetStream<string>(streamId, "GrainExplicitStream"); //隱式:GrainImplicitStreamreturn base.OnActivateAsync();}public async Task PublishMessageAsync(string data){Console.WriteLine($"Sending data: {data}");await this.stream.OnNextAsync(data);} }通過(guò)調(diào)用IAsyncStream的OnNextAsync發(fā)布消息即可。這里可以針對(duì)返回的Task狀態(tài)再作一些操作,如果不成功,重新發(fā)送或記錄日志等。
Client發(fā)布消息:
客戶端發(fā)布消息:
while (true) {Console.WriteLine("Press 'exit' to exit...");var input = Console.ReadLine();if (input == "exit") break;var publisherGrain = GrainClient.GrainFactory.GetGrain<IPublisherGrain>(Guid.Empty);publisherGrain.PublishMessageAsync(input); }發(fā)布消息
顯示訂閱下,需要增加另一個(gè)客戶端先完成訂閱:
var subscriberGrain = GrainClient.GrainFactory.GetGrain<IExplicitSubscriberGrain>(Guid.Empty); var streamHandle = subscriberGrain.SubscribeAsync().Result; Console.WriteLine("Press enter to exit..."); Console.ReadLine(); streamHandle.UnsubscribeAsync();顯示訂閱下發(fā)布消息
參考鏈接:
Actor模型
Orleans
案例Demo-OrleansStreams
相關(guān)文章:?
.NET的Actor模型:Orleans
微軟分布式云計(jì)算框架Orleans(1):Hello World
微軟分布式云計(jì)算框架Orleans(2):容災(zāi)與集群(1)
Aaron Stannard談Akka.NET 1.1
使用Akka.net開(kāi)發(fā)第一個(gè)分布式應(yīng)用
Orleans入門例子
Orleans例子再進(jìn)一步
Orleans稍微復(fù)雜的例子—互動(dòng)
Orleans簡(jiǎn)單配置
Orleans配置---持久化
Orleans—一些概念
Orleans的集群構(gòu)建
Oleans集群之Consul再解釋
Orleans解決并發(fā)之痛(一):單線程
Orleans解決并發(fā)之痛(二):Grain狀態(tài)
Orleans解決并發(fā)之痛(三):集群
原文地址:http://www.jianshu.com/p/5f150b5a77e0
.NET社區(qū)新聞,深度好文,微信中搜索dotNET跨平臺(tái)或掃描二維碼關(guān)注
總結(jié)
以上是生活随笔為你收集整理的Orleans解决并发之痛(四):Streams的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2017(深圳) .NET技术分享交流会
- 下一篇: .NET Core 2.0应用程序大小减