日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Orleans解决并发之痛(四):Streams

發(fā)布時(shí)間:2023/12/4 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Orleans解决并发之痛(四):Streams 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Orleans 提供了 Stream擴(kuò)展編程模型。此模型提供了一套API,使處理流更簡(jiǎn)單和更健壯。Stream默認(rèn)提供了兩種Provider,不同的流類型可能使用不同的Provider來(lái)處理,Simple Message Stream Provider 和 Azure Queue Stream Provider。Stream Providers兼容現(xiàn)有的隊(duì)列技術(shù),比如: Event Hubs、ServiceBus、Azure Queues、Apache Kafka,不再需要編寫額外的代碼來(lái)配合這些隊(duì)列技術(shù)的使用。

關(guān)于為什么Orleans會(huì)提供Stream擴(kuò)展編程模型?

當(dāng)今已經(jīng)有一系列技術(shù)可以來(lái)構(gòu)建一個(gè)流處理系統(tǒng)。包括持久存儲(chǔ)流數(shù)據(jù)方面,如:Event Hubs、Kafka;數(shù)據(jù)流計(jì)算操作方面,如: Azure Stream Analytics、Apache Storm、Apache Spark Streaming, 而這些技術(shù)并不適合細(xì)粒度的自由格式的流數(shù)據(jù)計(jì)算, 或者支持的并不好,因?yàn)閷?shí)際情況下可能需要對(duì)不同的數(shù)據(jù)流執(zhí)行不同的操作,Orleans Streams目的就是解決這類問(wèn)題,Stream編程模型和發(fā)布訂閱模式挺相似。

上述提到的一些技術(shù)我并沒(méi)有詳細(xì)學(xué)習(xí),后面會(huì)了解并對(duì)比,如果已熟悉的可以先思考并給我普及普及。

Orleans Stream大概實(shí)現(xiàn)的步驟如下:

  • 獲取 StreamProvider

  • 獲取 IAsyncStream<T>

  • 訂閱者訂閱一個(gè)Stream

  • 發(fā)布者向某個(gè)Stream發(fā)布消息

  • Silo配置文件OrleansConfiguration.xml修改

    在Globals節(jié)點(diǎn)中添加:

    <StorageProviders><Provider Type="Orleans.Storage.MemoryStorage" Name="PubSubStore" /> </StorageProviders> <StreamProviders><Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/> </StreamProviders>

    Name為PubSubStore的StorageProvider是必須的,Stream內(nèi)部需要它來(lái)跟蹤所有流訂閱,記錄各個(gè)流的發(fā)布者和訂閱者的關(guān)系,本例中使用MemoryStorage,實(shí)際生產(chǎn)環(huán)境這是不對(duì)的。

    Name為SMSProvider的StreamProvider指定了消息的發(fā)布形式,Orleans當(dāng)前提供的兩種StreamProvider:Simple Message Stream ProviderAzure Queue Stream Provider 都是可靠的。

    Simple Message Stream Provider:不保證可靠的交付,失敗的消息不會(huì)自動(dòng)重新發(fā)送,但可以根據(jù)返回的Task狀態(tài)來(lái)判斷是否重新發(fā)送,事件執(zhí)行順序遵循FIFO原則。

    Azure Queue Stream Provider:事件被加入Azure Queue, 如果傳送或處理失敗,事件不會(huì)從隊(duì)列中刪除,并且稍后會(huì)自動(dòng)重新被發(fā)送,因此事件執(zhí)行順序不遵循FIFO原則。

    獲取 StreamProvider

    var streamProvider = this.GetStreamProvider("SMSProvider");

    SMSProvider 對(duì)應(yīng)配置文件中Name為SMSProvider的StreamProvider

    獲取 IAsyncStream<T>

    var streamId = this.GetPrimaryKey(); var stream = streamProvider.GetStream<string>(streamId, "GrainStream");

    GetStream 需要兩個(gè)參數(shù),通過(guò)兩個(gè)值定位唯一的Stream:
    streamId:Guid類型,stream標(biāo)識(shí)
    streamNamespace:字符串,stream的命名空間

    訂閱一個(gè)Stream

    訂閱Stream分為隱式和顯式訂閱。

    隱式訂閱

    隱式訂閱的訂閱者是唯一的,不存在對(duì)一個(gè)Stream的多次訂閱,也不能取消訂閱。

    Interface:

    public interface IImplicitSubscriberGrain : IGrainWithGuidKey { }

    Grain:

    [ImplicitStreamSubscription("GrainImplicitStream")] public class ImplicitSubscriberGrain : Grain, IImplicitSubscriberGrain, IAsyncObserver<string> {protected StreamSubscriptionHandle<string> streamHandle;public override async Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");var stream = streamProvider.GetStream<string>(streamId, "GrainImplicitStream");streamHandle = await stream.SubscribeAsync(OnNextAsync);}public override async Task OnDeactivateAsync(){if (streamHandle != null)await streamHandle.UnsubscribeAsync();}public Task OnCompletedAsync(){return Task.CompletedTask;}public Task OnErrorAsync(Exception ex){return Task.CompletedTask;}public Task OnNextAsync(string item, StreamSequenceToken token = null){Console.WriteLine($"Received message:{item}");return Task.CompletedTask;} }
  • 在Grain上標(biāo)記 ImplicitStreamSubscription 屬性,變量值為命名空間;

  • 在Grain的OnActivateAsync方法體中調(diào)用SubscribeAsync;

  • 實(shí)現(xiàn)IAsyncObserver接口,當(dāng)發(fā)布者向Stream發(fā)送消息,訂閱者接到消息后將執(zhí)行OnNextAsync;

  • 隱式訂閱模式訂閱者自動(dòng)由發(fā)布者創(chuàng)建;

  • 顯式訂閱

    Interface:

    public interface IExplicitSubscriberGrain : IGrainWithGuidKey {Task<StreamSubscriptionHandle<string>> SubscribeAsync();Task ReceivedMessageAsync(string data); }

    Grain:

    public class ExplicitSubscriberGrain : Grain, IExplicitSubscriberGrain {private IAsyncStream<string> stream;public async override Task OnActivateAsync(){var streamProvider = this.GetStreamProvider("SMSProvider");stream = streamProvider.GetStream<string>(this.GetPrimaryKey(), "GrainExplicitStream");var subscriptionHandles = await stream.GetAllSubscriptionHandles();if (subscriptionHandles.Count > 0){subscriptionHandles.ToList().ForEach(async x =>{await x.ResumeAsync((payload, token) => this.ReceivedMessageAsync(payload));});}}public async Task<StreamSubscriptionHandle<string>> SubscribeAsync(){return await stream.SubscribeAsync((payload, token) => this.ReceivedMessageAsync(payload));}public Task ReceivedMessageAsync(string data){Console.WriteLine($"Received message:{data}");return Task.CompletedTask;} }
  • 訂閱者通過(guò)調(diào)用SubscribeAsync方法完成訂閱,并返回StreamSubscriptionHandle,這個(gè)對(duì)象提供了UnsubscribeAsync方法,方便取消訂閱;

  • 本例子中支持對(duì)同一個(gè)Stream被訂閱多次,被訂閱多次的結(jié)果是當(dāng)向這個(gè)Stream發(fā)送消息的時(shí)候,ReceivedMessageAsync會(huì)執(zhí)行多次。如果不希望對(duì)同一個(gè)Stream定義多次,在SubscribeAsync方法中可以通過(guò)GetAllSubscriptionHandles獲取當(dāng)前訂閱者的個(gè)數(shù),只有為0才執(zhí)行訂閱;

  • 訂閱者是一直存在的,除了被顯示調(diào)用了UnsubscribeAsync方法。在OnActivateAsync中我們加入了ResumeAsync操作, 當(dāng)Grain由未激活狀態(tài)變?yōu)榧せ顮顟B(tài)的時(shí)候,通過(guò)GetAllSubscriptionHandles獲取這個(gè)Stream中存在的訂閱者,通過(guò)ResumeAsync可以把它們重新喚醒。(模擬方式:殺掉Silo,重新啟動(dòng)即可,不過(guò)前提條件是PubSubStore不能使用MemoryStorage,因?yàn)槭褂肕emoryStorage存儲(chǔ)一旦重啟后訂閱者和發(fā)布者的關(guān)系都會(huì)丟失)

  • 發(fā)布消息

    Interface:

    public interface IPublisherGrain: IGrainWithGuidKey {Task PublishMessageAsync(string data); }

    Grain:

    public class PublisherGrain : Grain, IPublisherGrain {private IAsyncStream<string> stream;public override Task OnActivateAsync(){var streamId = this.GetPrimaryKey();var streamProvider = this.GetStreamProvider("SMSProvider");this.stream = streamProvider.GetStream<string>(streamId, "GrainExplicitStream"); //隱式:GrainImplicitStreamreturn base.OnActivateAsync();}public async Task PublishMessageAsync(string data){Console.WriteLine($"Sending data: {data}");await this.stream.OnNextAsync(data);} }

    通過(guò)調(diào)用IAsyncStream的OnNextAsync發(fā)布消息即可。這里可以針對(duì)返回的Task狀態(tài)再作一些操作,如果不成功,重新發(fā)送或記錄日志等。

    Client發(fā)布消息:

    客戶端發(fā)布消息:
    while (true) {Console.WriteLine("Press 'exit' to exit...");var input = Console.ReadLine();if (input == "exit") break;var publisherGrain = GrainClient.GrainFactory.GetGrain<IPublisherGrain>(Guid.Empty);publisherGrain.PublishMessageAsync(input); }


    發(fā)布消息

    顯示訂閱下,需要增加另一個(gè)客戶端先完成訂閱:
    var subscriberGrain = GrainClient.GrainFactory.GetGrain<IExplicitSubscriberGrain>(Guid.Empty); var streamHandle = subscriberGrain.SubscribeAsync().Result; Console.WriteLine("Press enter to exit..."); Console.ReadLine(); streamHandle.UnsubscribeAsync();


    顯示訂閱下發(fā)布消息

    參考鏈接:

    • Actor模型

    • Orleans

    • 案例Demo-OrleansStreams

    相關(guān)文章:?

    • .NET的Actor模型:Orleans

    • 微軟分布式云計(jì)算框架Orleans(1):Hello World

    • 微軟分布式云計(jì)算框架Orleans(2):容災(zāi)與集群(1)

    • Aaron Stannard談Akka.NET 1.1

    • 使用Akka.net開(kāi)發(fā)第一個(gè)分布式應(yīng)用

    • Orleans入門例子

    • Orleans例子再進(jìn)一步

    • Orleans稍微復(fù)雜的例子—互動(dòng)

    • Orleans簡(jiǎn)單配置

    • Orleans配置---持久化

    • Orleans—一些概念

    • Orleans的集群構(gòu)建

    • Oleans集群之Consul再解釋

    • Orleans解決并發(fā)之痛(一):單線程

    • Orleans解決并發(fā)之痛(二):Grain狀態(tài)

    • Orleans解決并發(fā)之痛(三):集群

    原文地址:http://www.jianshu.com/p/5f150b5a77e0


    .NET社區(qū)新聞,深度好文,微信中搜索dotNET跨平臺(tái)或掃描二維碼關(guān)注

    總結(jié)

    以上是生活随笔為你收集整理的Orleans解决并发之痛(四):Streams的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。