日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

响应式编程知多少 | Rx.NET 了解下

發布時間:2023/12/4 asp.net 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 响应式编程知多少 | Rx.NET 了解下 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 引言

An API for asynchronous programming with observable streams. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.ReactiveX 使用可觀察數據流進行異步編程的API。 ReactiveX結合了觀察者模式、迭代器模式和函數式編程的精華。

關于Reactive(本文統一譯作響應式),有一個The Reactive Manifesto【響應式宣言】:響應式系統(Reactive System)具備以下特質:即時響應性(Responsive)、回彈性(Resilient)、彈性(Elastic)以及消息驅動(Message Driven)。

很顯然開發一個響應式系統,并不簡單。 那本文就來講一講如何基于Rx.NET進行響應式編程,進而開發更加靈活、松耦合、可伸縮的響應式系統。

2. 編程范式

在開始之前呢,我們有必要了解下幾種編程范式:命令式編程、聲明式編程、函數式編程和響應式編程。

命令式編程:命令式編程的主要思想是關注計算機執行的步驟,即一步一步告訴計算機先做什么再做什么。

  • //1. 聲明變量

  • List<int> results = new List<int>();

  • //2. 循環變量

  • foreach(var num in Enumerable.Range(1,10))

  • {

  • ? ?//3. 添加條件

  • ? ?if (num > 5)

  • ? ?{ ?

  • ? ? ? ?//4. 添加處理邏輯

  • ? ? ? ?results.Add(num);

  • ? ? ? ?Console.WriteLine(num);

  • ? ?}

  • }

  • 聲明式編程:聲明式編程是以數據結構的形式來表達程序執行的邏輯。它的主要思想是告訴計算機應該做什么,但不指定具體要怎么做。

  • var nums = from num in Enumerable.Range(1,10) where num > 5 select num

  • 函數式編程:主要思想是把運算過程盡量寫成一系列嵌套的函數調用。

  • Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);

  • 響應式編程:響應式編程是一種面向數據流和變化傳播的編程范式,旨在簡化事件驅動應用的實現。響應式編程專注于如何創建依賴于變更的數據流并對變化做出響應。

  • IObservable<int> nums = Enumerable.Range(1, 10).ToObservable();


  • IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);


  • subscription.Dispose();

  • 3. Hello Rx.NET

    從一個簡單的Demo開始。 假設我們現在模擬電熱壺燒水,實時輸出當前水溫,一般我們會這樣做:

  • Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);

  • // do something else. 阻塞

  • 假設當前程序是智能家居的中控設備,不僅控制電熱壺燒水,還控制其他設備,為了避免阻塞主線程。一般我們會創建一個Thread或Task去做。

  • Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));

  • // do something else. 非阻塞

  • 假設現在我們不僅要在控制臺輸出而且還要實時通過揚聲器報警。這時我們應該想到委托和事件。

  • class Heater

  • {

  • ? ?private delegate void TemperatureChanged(int temperature);

  • ? ?private event TemperatureChanged TemperatureChangedEvent;

  • ? ?public void BoilWater()

  • ? ?{

  • ? ? ? ?TemperatureChangedEvent += ShowTemperature;

  • ? ? ? ?TemperatureChangedEvent += MakeAlerm;

  • ? ? ? ?Task.Run(

  • ? ? ? ? ? ?() =>

  • ? ? ? ?Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))

  • ? ? ? ?);

  • ? ?}

  • ? ?private void ShowTemperature(int temperature)

  • ? ?{

  • ? ? ? ?Console.WriteLine($"當前溫度:{temperature}");

  • ? ?}

  • ? ?private void MakeAlerm(int temperature)

  • ? ?{

  • ? ? ? ?Console.WriteLine($"嘟嘟嘟,當前水溫{temperature}");

  • ? ?}

  • }

  • class Program

  • {

  • ? ?static void Main(string[] args)

  • ? ?{

  • ? ? ? ?Heater heater = new Heater(); ? ? ? ?

  • ? ? ? ?heater.BoilWater();

  • ? ?}

  • }

  • 瞬間代碼量就上去了。但是借助Rx.NET,我們可以簡化成以下代碼:

  • var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可觀察序列

  • Subject<int> subject = new Subject<int>();//申明Subject

  • subject.Subscribe((temperature) => Console.WriteLine($"當前溫度:{temperature}"));//訂閱subject

  • subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,當前水溫:{temperature}"));//訂閱subject

  • observable.Subscribe(subject);//訂閱observable

  • 僅僅通過以下三步:

  • 調用?ToObservable將枚舉序列轉換為可觀察序列。

  • 通過指定?NewTheadScheduler.Default來指定在單獨的線程進行枚舉。

  • 調用?Subscribe方法進行事件注冊。

  • 借助?Subject進行多播傳輸

  • 通過以上我們可以看到Rx.NET大大簡化了事件處理的步驟,而這只是Rx的冰山一角。

    4. Rx.NET 核心

    Reactive Extensions(Rx)是一個為.NET應用提供響應式編程模型的庫,用來構建異步基于事件流的應用,通過安裝 System.ReactiveNuget包進行引用。Rx將事件流抽象為Observable sequences(可觀察序列)表示異步數據流,使用LINQ運算符查詢異步數據流,并使用 Scheduler來控制異步數據流中的并發性。簡單地說:Rx = Observables + LINQ + Schedulers。

    在軟件系統中,事件是一種消息用于指示發生了某些事情。事件由Event Source(事件源)引發并由Event Handler(事件處理程序)使用。 在Rx中,事件源可以由observable表示,事件處理程序可以由observer表示。 但是應用程序使用的數據如何表示呢,例如數據庫中的數據或從Web服務器獲取的數據。而在應用程序中我們一般處理的數據無外乎兩種:靜態數據和動態數據。 但無論使用何種類型的數據,其都可以作為流來觀察。換句話說,數據流本身也是可觀察的。也就意味著,我們也可以用observable來表示數據流。

    講到這里,Rx.NET的核心也就一目了然了:

  • 一切皆為數據流

  • Observable 是對數據流的抽象

  • Observer是對Observable的響應

  • 在Rx中,分別使用 IObservable<T>IObserver<T>接口來表示可觀察序列和觀察者。它們預置在system命名空間下,其定義如下:

  • public interface IObservable<out T>

  • {

  • ? ? ?//Notifies the provider that an observer is to receive notifications.

  • ? ? ?IDisposable Subscribe(IObserver<T> observer);

  • }


  • public interface IObserver<in T>

  • {

  • ? ?//Notifies the observer that the provider has finished sending push-based notifications.

  • ? ?void OnCompleted();


  • ? ?//Notifies the observer that the provider has experienced an error condition.

  • ? ?void OnError(Exception error);


  • ? ?//Provides the observer with new data.

  • ? ?void OnNext(T value);

  • }

  • 5. 創建IObservable

    創建 IObservable<T>主要有以下幾種方式:

    1. 直接實現 IObservable<T>接口

    2. 使用 Observable.Create創建

  • Observable.Create<int>(observer=>{

  • ? ?for (int i = 0; i < 5; i++)

  • ? ?{

  • ? ? ? ?observer.OnNext(i);

  • ? ?}

  • ? ?observer.OnCompleted();

  • ? ?return Disposable.Empty;

  • })

  • 3. 使用 Observable.Deffer進行延遲創建(當有觀察者訂閱時才創建)比如要連接數據庫進行查詢,如果沒有觀察者,那么數據庫連接會一直被占用,這樣會造成資源浪費。使用Deffer可以解決這個問題。

  • Observable.Defer(() =>

  • {

  • ? ?var connection = Connect(user, password);

  • ? ?return connection.ToObservable();

  • });

  • 4. 使用 Observable.Generate創建迭代類型的可觀察序列

  • IObservable<int> observable =

  • ? ?Observable.Generate(

  • ? ? ? ?0, ? ? ? ? ? ? ?//initial state

  • ? ? ? ?i => i < 10, ? ?//condition (false means terminate)

  • ? ? ? ?i => i + 1, ? ? //next iteration step

  • ? ? ? ?i => i * 2); ? ? ?//the value in each iteration

  • 5. 使用 Observable.Range創建指定區間的可觀察序列

  • IObservable<int> observable = Observable.Range (0, 10).Select (i => i * 2);

  • 6. 創建特殊用途的可觀察序列

  • Observable.Return ("Hello World");//創建單個元素的可觀察序列

  • Observable.Never<string> ();//創建一個空的永遠不會結束的可觀察序列

  • Observable.Throw<ApplicationException> (

  • new ApplicationException ("something bad happened"))//創建一個拋出指定異常的可觀察序列

  • Observable.Empty<string> ()//創建一個空的立即結束的可觀察序列

  • 7. 使用 ToObservable轉換 IEnumerate和Task類型

  • Enumerable.Range(1, 10).ToObservable();

  • IObservable<IEnumerable<string>> resultsA = searchEngineA.SearchAsync(term).ToObservable();

  • 8. 使用 Observable.FromEventPattern<T>Observable.FromEvent<TDelegate,TEventArgs>進行事件的轉換

  • public delegate void RoutedEventHandler(object sender,

  • System.Windows.RoutedEventArgs e)

  • IObservable<EventPattern<RoutedEventArgs>> clicks =

  • ? ? ? ? ? ? ? ?Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(

  • ? ? ? ? ? ? ? ? ? ?h => theButton.Click += h,

  • ? ? ? ? ? ? ? ? ? ?h => theButton.Click -= h);

  • clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);

  • 9. 使用 Observable.Using進行資源釋放

  • IObservable<string> lines =

  • ? ?Observable.Using (

  • ? ? ? ?() => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with

  • ? ? ? ?stream =>

  • ? ? ? ?Observable.Generate (

  • ? ? ? ? ? ?stream, //initial state

  • ? ? ? ? ? ?s => !s.EndOfStream, //we continue until we reach the end of the file

  • ? ? ? ? ? ?s => s, //the stream is our state, it holds the position in the file

  • ? ? ? ? ? ?s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)

  • ? ?);

  • 10. 使用 Observable.Interval創建指定間隔可觀察序列

    11. 使用 Observable.Timer創建可觀察的計時器

    6. RX 操作符

    創建完IObservable后,我們可以對其應用系列Linq操作符,對其進行查詢、過濾、聚合等等。Rx內置了以下系列操作符:下面通過圖示來解釋常用操作符的作用:

    7. 多播傳輸靠:Subject

    基于以上示例,我們了解到,借助Rx可以簡化事件模型的實現,而其實質上就是對觀察者模式的擴展。提到觀察者模式,我們知道一個Subject可以被多個觀察者訂閱,從而完成消息的多播。同樣,在Rx中,也引入了Subject用于多播消息傳輸,不過Rx中的Subject具有雙重身份——即是觀察者也是被觀察者。

  • interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>

  • {

  • }

  • Rx中默認提供了以下四種實現:

    Subject- 向所有觀察者廣播每個通知

    AsyncSubject- 當可觀察序列完成后有且僅發送一個通知

    ReplaySubject- 緩存指定通知以對后續訂閱的觀察者進行重放

    BehaviorSubject- 推送默認值或最新值給觀察者

    但對于第一種 Subject<T>有一點需要指出,當其有多個觀察者序列時,一旦其中一個停止發送消息,則Subject就停止廣播所有其他序列后續發送的任何消息。

    8. 有溫度的可觀察者序列

    對于Observable,它們是有溫度的,有冷熱之分。它們的區別如下圖所示:

    Cold Observable:有且僅當有觀察者訂閱時才發送通知,且每個觀察者獨享一份完整的觀察者序列。

    Hot Observable:不管有無觀察者訂閱都會發送通知,且所有觀察者共享同一份觀察者序列。

    9. 一切皆在掌控:Scheduler

    在Rx中,使用Scheduler來控制并發。而對于Scheduler我們可以理解為程序調度,通過Scheduler來規定在什么時間什么地點執行什么事情。Rx提供了以下幾種Scheduler:

  • NewThreadScheduler:即在新線程上執行

  • ThreadPoolScheduler:即在線程池中執行

  • TaskPoolScheduler:同ThreadPoolScheduler

  • CurrentThreadScheduler:在當前線程執行

  • ImmediateScheduler:在當前線程立即執行

  • EventLoopScheduler:創建一個后臺線程按序執行所有操作

  • 舉例而言:

  • Observable.Return("Hello",NewThreadScheduler.Default)

  • .Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")

  • );

  • Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");


  • 以上輸出:

  • Current ThreadId1

  • Hello on ThreadId4

  • 10. 最后

    羅里吧嗦的總算把《Rx.NET In Action》這本書的內容大致梳理了一遍,對Rx也有了一個更深的認識,Rx擴展了觀察者模式用于支持數據和事件序列,內置系列操作符允許我們以聲明式的方式組合這些序列,且無需關注底層的實現進行事件驅動開發:如線程、同步、線程安全、并發數據結構和非阻塞IO。

    但事無巨細,難免疏漏。對響應式編程有興趣的不妨拜讀下此書,相信對你會大有裨益。

    參考資料:

    Rx.NET in Action.pdf

    ReactiveX

    .Net中的反應式編程(Reactive Programming)


    總結

    以上是生活随笔為你收集整理的响应式编程知多少 | Rx.NET 了解下的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。