基于ASP.NET Core SignalR的流式传输
基于ASP.NET Core SignalR的流式傳輸
SignalR概述
SignalR是ASP.NET Core下非常流行的實現Web實時功能的庫。微軟再文檔中列出了適合的應用場景:
適合 SignalR 的候選項:
需要從服務器進行高頻率更新的應用。示例包括游戲、社交網絡、投票、拍賣、地圖和 GPS 應用。
儀表板和監視應用。示例包括公司儀表板、即時銷售更新或旅行警報。
協作應用。協作應用的示例包括白板應用和團隊會議軟件。
需要通知的應用。社交網絡、電子郵件、聊天、游戲、旅行警報和很多其他應用都需使用通知。
其實只要適合使用Ajax的場景都能使用,他比WebSockets更高級,實現了斷線重連,廣播,分組等功能。
流式傳輸
在介紹SingalR流式處理之前,我想先介紹一下流式處理的基本概念,
一提到流式傳輸,很多人往往感到比較棘手,那是因為可能用的比較少,通常我們習慣了準備數據然后一口氣處理數據的編程范式,而不習慣一個個處理數據的范式。
流式處理就像一個水管,一頭進水,一頭出水。
例如C#中流式處理一個文本文件,我們是一次讀取一行并處理他,而不是一口氣讀取文件中的所有行并處理他。
using (StreamReader sr = new StreamReader("TestFile.txt")) {string line;// Read and display lines from the file until the end of// the file is reached.while ((line = sr.ReadLine()) != null) {Console.WriteLine(line); } }流式處理的好處就是數據的一部分準備好了,就可以對他立即進行處理,在內存中每次僅保留需要處理的那部分數據,這會大大優化內存的使用。
流式梳理非常適合處理大型數據集,例如文件讀取,網絡數據下載,IoT的數據傳輸,遍歷并逐步處理數據庫數據。
IEnumerable<T>
有必要再對這個接口重新做一個簡要說明,枚舉(稱呼枚舉或者迭代器,個人認為迭代器更合適)就是一個個列舉。他是一種序列的概念,例如 1 2 3 4 5等,每列舉一個我就可以處理一個。C#中的foreach就是用于處理迭代器的語法糖:
//枚舉每一個Item foreach(var item in GetNumbers()) {//處理每個item }IEnumerable<int> GetNumbers() {int i=0;while(i<10) {yield return i++; } }//作為比對我列舉一個常見的編程一口氣準備好的編程范式。 List<int> GetNumbers() {List<int> numbers = new List<int>(); //數組會分配內存,如果數據量很大,分配的內存會非常高。int i=0;while(i<10){numbers.add(i);}return numbers; }在C# 8.0以前,這個接口都是同步的,也就是說產生序列的方法會阻塞調用序列的方法。在異步async await 大行其道的今天,顯然沒有異步版本的迭代器實現是一個巨大的缺陷。(在這一以前可以用使用ValueTask<T>)
IAsyncEnumerable<T>
C# 8.0引入了這個接口,官網文檔將它稱為異步流。這個接口其實就是為了統一async await IEnumerable<T> . 微軟在背后做了大量的工作,具體實現細節大家可以參考Async streams - C# 8.0 specification proposals | Microsoft Docs
我們來使用這個接口返回一個異步的迭代器:
public async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken) {var data = await _respository.GetDataAsync(cancellationToken);//為了演示,假設從數據庫異步讀取數據。foreach (var item in data) {yield return item; } }SignalR的流式傳輸
SignalR的流式傳輸使用了IAsyncEnumerable<T>接口,我直接引用微軟的說法:
ASP.NET Core SignalR支持從客戶端到服務器以及從服務器到客戶端的流式傳輸。這適用于數據片段隨著時間的推移而發生的情況。流式傳輸時,每個片段一旦變為可用,就會發送到客戶端或服務器,而不是等待所有數據都可用。
當集線器方法返回 IAsyncEnumerable ChannelReader Task<IAsyncEnumerable<T>> Task<ChannelReader<T>> 時它會自動成為流式處理中心方法
public class AsyncEnumerableHub : Hub {public async IAsyncEnumerable<int> Counter(int count,int delay,[EnumeratorCancellation]CancellationToken cancellationToken){for (var i = 0; i < count; i++){// Check the cancellation token regularly so that the server will stop// producing items if the client disconnects.cancellationToken.ThrowIfCancellationRequested();yield return i;// Use the cancellationToken in other APIs that accept cancellation// tokens so the cancellation can flow down to them.await Task.Delay(delay, cancellationToken);}} }代碼很清楚的說明了這一做法。ChannelReader是另一種實現方法,這篇不做講解。
做一個例子來實現一下
當下非常流行數據監控大屏應用。監控交通狀況,股票行情,企業生產數據看板,IOT傳感器實時數據顯示,實時銷售數據分析等等。
我想做一個簡單的數據更新大屏的例子(一切為了簡單,只用內存中的數據來存放數據),原諒我UI能比比較差,這篇不展示UI上的東西,而只展示數據如何以流的方式發送。
第一步 準備一個類用于處理數據更新
這個類使用一個定時器來定期更新數據,并記錄一個數據是否更新的狀態以便于只在數據更新的時候才發送數據。同時使用一個并發字典記錄待更新的數據。
public class DataTicker{private Timer _timer; private volatile bool isUpdated ?= false;//標記數據是否更新private readonly ConcurrentDictionary<string, Data> _datas = new ConcurrentDictionary<string, Data>();public DataTicker(){InitData();StartUpdateData();}public bool IsUpdated{get { return isUpdated; }}public async Task<ICollection<Data>> GetData(){if (IsUpdated) //如果數據已更新才發送數據{await Task.Delay(500); //模擬返回數據的延遲。這里好的做法是異步的方式返回數據。return _datas.Values;}return null;}public void StartUpdateData(){_timer = new Timer(UpdateDate, null, TimeSpan.FromMilliseconds(3000), TimeSpan.FromMilliseconds(3000));}private void InitData(){for(int i=0;i<10;i++){Data data = new Data();data.Id = i;data.Name = i.ToString();data.UpdatedDate = DateTime.Now;_datas.TryAdd(i.ToString(), data);}}public void UpdateDate(object state){var newData = foreach (var item in _datas.Values){item.UpdatedDate = System.DateTime.Now;}this.isUpdated = true;}public void MarkAsRead(){this.isUpdated = false;}}第二部 編寫SignalR中心異步流方法
public class DataUpdateHub : Hub{private DataTicker _ticker;public DataUpdateHub(DataTicker stockTicker){this._ticker = stockTicker;}//這個方法沒有使用cancellationToken,好的做法是從外部傳遞cancellationTokenpublic async IAsyncEnumerable<Data> GetData([EnumeratorCancellation] CancellationToken cancellationToken){while (true){var data = await _ticker.GetData(); //當有數據更新的時候方法會返回數據,否則返回空。if (data != null){foreach (var item in data){yield return item;}_ticker.MarkAsRead(); //發送完畢后,將標記數據標記為已發送。}}}}第三步 Javascript客戶端調用代碼
connection.start().then(function () {connection.stream("GetData") //連接到hub的異步流方法,當有數據從中心流出時候,next方法會被調用。因為本例中心使用wihle(true)方法,數據流會一直發送,所以complete方法將不會被調用。如果中心方法返回有限數據結束后例如使用foreach,則會調用complete方法。.subscribe({next: (item) => {var li = document.createElement("li"); li.textContent = "ID:" + item.id + "Datetime:" + item.updatedDate;document.getElementById("messagesList").appendChild(li);},complete: () => {var li = document.createElement("li");li.textContent = "Stream completed";document.getElementById("messagesList").appendChild(li);},error: (err) => {var li = document.createElement("li");li.textContent = err;document.getElementById("messagesList").appendChild(li);},}); }).catch(function (err) {return console.error(err.toString()); });第四步 配置DataTicker依賴注入
builder.Services.AddSignalR(); builder.Services.AddSingleton<DataTicker>();注意:這里的DataTicker注冊的是單例的,也就是說所有發送給客戶端的數據是共享的。既然是共享的,也就存在并發訪問的問題,這就是為什么使用volatile關鍵字和并發字典的原因。
總結
ASP.NET SignalR可以很方便的使用異步流傳輸數據,這樣客戶端和服務器端是使用流的方式連接在一起的。
本文使用的是從服務器到客戶端的流,當然你可以使用從客戶端到服務器端的流。從.Net和Java的客戶端都可以調用SignalR的中心流方法。具體可以參考微軟官方文檔,使用 ASP.NET Core 中的流式處理SignalR | Microsoft Docs
總結
以上是生活随笔為你收集整理的基于ASP.NET Core SignalR的流式传输的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#10 和 .NET6 代码跨平台开发
- 下一篇: asp.net ajax控件工具集 Au