多线程编程学习笔记——使用并发集合(三)
接上文 多線程編程學習筆記——使用并發集合(一)
接上文 多線程編程學習筆記——使用并發集合(二)
?
?
四、?? 使用ConcurrentBag創建一個可擴展的爬蟲
?
本示例在多個獨立的即可生產任務又可消費任務的工作者間如何擴展工作量。
?
?1.程序代碼如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading;namespace ThreadCollectionDemo {class Program{static Dictionary<string, string[]> contextItems = new Dictionary<string, string[]>(); static void Main(string[] args){Console.WriteLine(string.Format("----- ConcurrentBag 操作----"));CreateLinks();Task task = RunBag();task.Wait();Console.Read();}static async Task RunBag(){var taskBag = new ConcurrentBag<CrawlingTask>();string[] urls = new string[] { "http://www.163.com", "http://www.jd.com", "http://www.hexun.com","http://www.tmall.com", "http://www.qq.com" };var crawlers = new Task[5];for (int i = 1; i <= 5; i++){string crawlerName = "Crawler " + i.ToString();taskBag.Add(new CrawlingTask { UrlToCraw = urls[i - 1], ProductName = "root" });crawlers[i - 1] = Task.Run(() => Craw(taskBag,crawlerName));}await Task.WhenAll(crawlers);}static async Task Craw(ConcurrentBag<CrawlingTask> bag, string crawlerName){CrawlingTask task;while (bag.TryTake(out task)){Console.WriteLine(" {0} url 從ConcurrentBag 取出,上一節點{1},名稱{2}", task.UrlToCraw, task.ProductName, crawlerName);IEnumerable<string> urls = await GetLinksFromContent(task);if (urls != null){foreach (var url in urls){var t = new CrawlingTask{UrlToCraw = url,ProductName = crawlerName};bag.Add(t);} }}if (task != null){Console.WriteLine("第{0} 個url 添加到ConcurrentBag,線程名稱{1},爬蟲名稱{2}", task.UrlToCraw, task.ProductName, crawlerName);}elseConsole.WriteLine(" TASK IS NULL ");} static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task){await GetRandomDely();if (contextItems.ContainsKey(task.UrlToCraw))return contextItems[task.UrlToCraw];return null;}static void CreateLinks(){contextItems["http://www.163.com"] = new[]{ "http://www.163.com/a.html","http://www.163.com/b.html" }; contextItems["http://www.jd.com"] = new[]{ "http://www.jd.com/a.html","http://www.jd.com/b.html" };contextItems["http://www.qq.com"] = new[]{ "http://www.qq.com/1.html","http://www.qq.com/2.html", "http://www.qq.com/3.html","http://www.qq.com/4.html"};contextItems["http://www.tmall.com"] = new[]{ "http://www.tmall.com/a.html","http://www.tmall.com/b.html" };contextItems["http://www.hexun.com"] = new[]{ "http://www.hexun.com/a.html","http://www.hexun.com/b.html", "http://www.hexun.com/c.html","http://www.hexun.com/d.html" };}static Task GetRandomDely(){int dely = new Random(DateTime.Now.Millisecond).Next(150, 600);return Task.Delay(dely);} } class CrawlingTask{public string UrlToCraw { get; set; }public string ProductName { get; set; }}}
?
?
?2.程序運行結果,如下圖。
?
?
?
??????? 這個程序模擬了使用多個網絡爬蟲進行網頁索引的場景。剛開始,我們定義了一個包含網頁URL的字典。這個字典模擬了包含其他頁面鏈接的網頁。這個實現非常簡單,并不關心索引已經訪問過的頁面,但正因為它如此簡單,我們才可能關注并行工作負載。
?
?? ?? ? 接著創建了一個并發包,其中包含爬蟲任務。我們創建了四個爬蟲,并且給每個爬蟲都提供了一個不同的網站根URL。然后等等所有爬蟲完成工作。現在每個爬蟲開始檢查提供給它的網站URL。我們通過等待一個隨機事件來模擬網絡IO處理。如果頁面包含的URL越多,爬蟲向包中放入的任務也越多。然后檢查包中是否還有任何需要爬蟲處理的任務,如果沒有說明爬蟲完成了工作 。
?
?? ?? ?? 如果檢查前四個根URL后的第一行輸出,我們將看到爬蟲N放置的任務通過會被同一個爬蟲處理。然而,接下來的行則會不同。這是因為ConcurrentBag內部針對多個線程既可以添加元素又可以刪除元素的場景進行了優化。實現方式是每個線程使用自己的本地隊列的元素,所以使用這個隊列時無需要任何鎖。只有當本地隊列中沒有任何元素時,我們才執行一些鎖定操作并嘗試從其他線程的本地隊列中“偷取”工作。這種行為 有豕于在所有工作 者間分發工作并避免使用鎖。
?
?
?
五、?? 使用BlockingCollention進行異步處理
?
???????? 本示例學習如何使用BlockingCollection來簡化實現 異步處理的工作負載。
?
?1.程序代碼如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading;namespace ThreadCollectionDemo {class Program{ static void Main(string[] args){Console.WriteLine(string.Format("----- BlockingCollection 操作----"));Console.WriteLine(string.Format("----- BlockingCollection 操作 queue----")); Task task = RunBlock();task.Wait();Console.WriteLine(string.Format("----- BlockingCollection 操作 Stack----"));task = RunBlock(new ConcurrentStack<CustomTask>());task.Wait();Console.Read();}static async Task RunBlock(IProducerConsumerCollection<CustomTask> collection = null){string name = "queue ";var taskBlock = new BlockingCollection<CustomTask>();if (collection != null){taskBlock = new BlockingCollection<CustomTask>(collection);name = "stack";} var taskSrc = Task.Run(() => TaskProduct(taskBlock));Task[] process = new Task[4];for (int i = 1; i <= 4; i++){string processId = i.ToString();process[i - 1] = Task.Run(() => TaskProcess(taskBlock, name + processId));}await taskSrc; await Task.WhenAll(process);} static async Task TaskProduct(BlockingCollection<CustomTask> block){for (int i = 0; i < 20; i++){await Task.Delay(50);var workitem = new CustomTask { Id = i };block.Add(workitem);Console.WriteLine(string.Format("把{0} 元素添加到BlockingCollection", workitem.Id));}block.CompleteAdding();}static async Task TaskProcess(BlockingCollection<CustomTask> collection, string name){ await GetRandomDely();foreach (var item in collection){Console.WriteLine(string.Format("--- Task {0} 處理 操作 名稱: {1} ---",item.Id,name));await GetRandomDely();}}static Task GetRandomDely(){int dely = new Random(DateTime.Now.Millisecond).Next(1, 1000);return Task.Delay(dely);}} }?
?
?2.程序運行結果,如下圖。
?
?
?
?
?
?? ? ? ? 先說第一個場景,這里我們使用了BlockingCollection類,它帶來了很多優勢。首先,我們能夠改變任務存儲在阻塞集合中的方式。默認情況下它使用的是ConcurrentQueue容器,但是我們能夠使用任何實現 IProducerConsumerConllection泛型接口的集合。
?
???????? 工作者通過對阻塞集合迭代調用GetConsumingEnumerable方法來獲取 工作項。如果在這個集合中沒有任何元素,迭代器會阻塞工作線程直到有元素被放到集合中。當生產才調用集合的Completedding時迭代周期會結束。這標志著工作完成。
?
????????? 工作量生產者將任務插入到BlockingCollection,然后調用 CompleteAdding方法,這會使用所有工作 者完成工作 ?,F在在程序輸出中我們看到兩個結果序列,演示了并發隊列和堆棧集合的不同之處。
?
?
?
總結
以上是生活随笔為你收集整理的多线程编程学习笔记——使用并发集合(三)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zabbix6
- 下一篇: spring 监听器简介