程序员过关斩将--自定义线程池来实现文档转码
背景
我司在很久之前,一位很久之前的同事寫過一個文檔轉圖片的服務,具體業務如下:
1. 用戶在客戶端上傳文檔,可以是ppt,word,pdf 等格式,用戶上傳完成可以在客戶端預覽上傳的文檔,預覽的時候采用的是圖片形式(不要和我說用別的方式預覽,現在已經來不及了)
2. 當用戶把文檔上傳到云端之后(阿里云),把文檔相關的信息記錄在數據庫,然后等待轉碼完成
3. 服務器有一個轉碼服務(其實就是一個windows service)不停的在輪訓待轉碼的數據,如果有待轉碼的數據,則從數據庫取出來,然后根據文檔的網絡地址下載到本地進行轉碼(轉成多張圖片)
4. 當文檔轉碼完畢,把轉碼出來的圖片上傳到云端,并把云端圖片的信息記錄到數據庫
5. 客戶端有預覽需求的時候,根據數據庫來判斷有沒有轉碼成功,如果成功,則獲取數據來顯示。
文檔預覽的整體過程如以上所說,老的轉碼服務現在什么問題呢?
1. 由于一個文檔同時只能被一個線程進行轉碼操作,所以老的服務采用了把待轉碼數據劃分管道的思想,一共有六個管道,映射到數據庫大體就是 Id=》管道ID 這個樣子。
2. 一個控制臺程序,根據配置文件信息,讀取某一個管道待轉碼的文檔,然后單線程進行轉碼操作
3. 一共有六個管道,所以服務器上起了六個cmd的黑窗口......
4. 有的時候個別文檔由于格式問題或者其他問題 轉碼過程中會卡住,具體的表現為:停止了轉碼操作。
5. 如果程序卡住了,需要運維人員重新啟動轉碼cmd窗口(這種維護比較蛋疼)
后來機緣巧合,這個程序的維護落到的菜菜頭上,維護了一周左右,大約重啟了10多次,終于忍受不了了,重新搞一個吧。仔細分析過后,刨除實際文檔轉碼的核心操作之外,整個轉碼流程其實還有很多注意點
1. 需要保證轉碼服務不被卡住,如果和以前一樣就沒有必要重新設計了
2. 盡量避免開多個進程的方式,其實在這個業務場景下,多個進程和多個線程作用是一致的。
3. 每個文檔只能被轉碼一次,如果一個文檔被轉碼多次,不僅浪費了服務器資源,而且還有可能會有數據不一致的情況發生
4. 轉碼失敗的文檔需要有一定次數的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文檔再次被操作的機會
5. 因為程序不停的把文檔轉碼成本地圖片,所以需要保證這些文件在轉碼完成在服務器上刪除,不然的話,時間長了會生成很多無用的文件
說了這么多,其實需要注意的點還是很多的。以整個的轉碼流程來說,本質上是一個任務池的生產和消費問題,任務池中的任務就是待轉碼的文檔,生產者不停的把待轉碼文檔丟進任務池,消費者不停的把任務池中文檔轉碼完成。
線程池
這很顯然和線程池很類似,菜菜之前就寫過一個線程池的文章,有興趣的同學可以去翻翻歷史。今天我們就以這個線程池來解決這個轉碼問題。線程池的本質是初始化一定數目的線程,不停的執行任務。
?//線程池定義?public?class?LXThreadPool:IDisposable{bool?PoolEnable?=?true;?//線程池是否可用?List<Thread>?ThreadContainer?=?null;?//線程的容器ConcurrentQueue<ActionData>?JobContainer?=?null;?//任務的容器int?_maxJobNumber;?//線程池最大job容量ConcurrentDictionary<string,?DateTime>?JobIdList?=?new?ConcurrentDictionary<string,?DateTime>();?//job的副本,用于排除某個job?是否在運行中public?LXThreadPool(int?threadNumber,int?maxJobNumber=1000){if(threadNumber<=0?||?maxJobNumber?<=?0){throw?new?Exception("線程池初始化失敗");}_maxJobNumber?=?maxJobNumber;ThreadContainer?=?new?List<Thread>(threadNumber);JobContainer?=?new?ConcurrentQueue<ActionData>();for?(int?i?=?0;?i?<?threadNumber;?i++){var?t?=?new?Thread(RunJob);t.Name?=?$"轉碼線程{i}";ThreadContainer.Add(t);t.Start();}//清除超時任務的線程var?tTimeOutJob?=?new?Thread(CheckTimeOutJob);tTimeOutJob.Name?=?$"清理超時任務線程";tTimeOutJob.Start();}//往線程池添加一個線程,返回線程池的新線程數public?int?AddThread(int?number=1){if(!PoolEnable?||?ThreadContainer==null?||?!ThreadContainer.Any()?||?JobContainer==null||?!JobContainer.Any()){return?0;}while?(number?<=?0){var?t?=?new?Thread(RunJob);ThreadContainer.Add(t);t.Start();number?-=?number;}return?ThreadContainer?.Count????0;}//向線程池添加一個任務,返回0:添加任務失敗?? 1:成功public?int?AddTask(Action<object>?job,?object?obj,string?actionId,?Action<Exception>?errorCallBack?=?null){if?(JobContainer?!=?null){if(JobContainer.Count>=?_maxJobNumber){return?0;}//首先排除10分鐘還沒轉完的var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10)?<?DateTime.Now);if(timeoOutJobList!=null&&?timeoOutJobList.Any()){foreach?(var?timeoutJob?in?timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key,out?DateTime?v);}}if?(!JobIdList.Any(s?=>?s.Key?==?actionId)){if(JobIdList.TryAdd(actionId,?DateTime.Now)){JobContainer.Enqueue(new?ActionData?{?Job?=?job,?Data?=?obj,?ActionId?=?actionId,?ErrorCallBack?=?errorCallBack?});return?1;}else{return?101;}}else{return?100;}????????????}return?0;}??private?void?RunJob(){while?(JobContainer?!=?null??&&?PoolEnable){//任務列表取任務ActionData?job?=?null;JobContainer?.TryDequeue(out?job);if?(job?==?null){//如果沒有任務則休眠Thread.Sleep(20);continue;}try{//執行任務job.Job.Invoke(job.Data);}catch?(Exception?error){//異常回調if?(job?!=?null&&?job.ErrorCallBack!=null){job?.ErrorCallBack(error);}}finally{if?(!JobIdList.TryRemove(job.ActionId,out?DateTime?v)){}}}}//終止線程池public?void?Dispose(){PoolEnable?=?false;JobContainer?=?null;if?(ThreadContainer?!=?null){foreach?(var?t?in?ThreadContainer){//強制線程退出并不好,會有異常t.Join();}ThreadContainer?=?null;}}//清理超時的任務private?void?CheckTimeOutJob(){//首先排除10分鐘還沒轉完的var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10)?<?DateTime.Now);if?(timeoOutJobList?!=?null?&&?timeoOutJobList.Any()){foreach?(var?timeoutJob?in?timeoOutJobList){JobIdList.TryRemove(timeoutJob.Key,?out?DateTime?v);}}System.Threading.Thread.Sleep(60000);}}public?class?ActionData{//任務的id,用于排重public?string?ActionId?{?get;?set;?}//執行任務的參數public?object?Data?{?get;?set;?}//執行的任務public?Action<object>?Job?{?get;?set;?}//發生異常時候的回調方法public?Action<Exception>?ErrorCallBack?{?get;?set;?}}以上就是一個線程池的具體實現,和具體的業務無關,完全可以用于任何適用于線程池的場景,其中有一個注意點,我新加了任務的標示,主要用于排除重復的任務被投放多次(只排除正在運行中的任務)。當然代碼不是最優的,有需要的同學可以自己去優化
使用線程池
接下來,我們利用以上的線程池來完成我們的文檔轉碼任務,首先我們啟動的時候初始化一個線程池,并啟動一個獨立線程來不停的往線程池來輸送任務,順便起了一個監控線程去監視發送任務的線程
string?lastResId?=?null;string?lastErrorResId?=?null;Dictionary<string,?int>?ResErrNumber?=?new?Dictionary<string,?int>();?//轉碼失敗的資源重試次數int?MaxErrNumber?=?5;//最多轉碼錯誤的資源10次Thread?tPutJoj?=?null;LXThreadPool?pool?=?new?LXThreadPool(4,100);public?void?OnStart(){//初始化一個線程發送轉碼任務tPutJoj?=?new?Thread(PutJob);tPutJoj.IsBackground?=?true;tPutJoj.Start();//初始化?監控線程var?tMonitor?=?new?Thread(MonitorPutJob);tMonitor.IsBackground?=?true;tMonitor.Start();}//監視發放job的線程private?void?MonitorPutJob(){while?(true){if(tPutJoj?==?null||?!tPutJoj.IsAlive){Log.Error($"發送轉碼任務線程停止==========");tPutJoj?=?new?Thread(PutJob);tPutJoj.Start();Log.Error($"發送轉碼任務線程重新初始化并啟動==========");}System.Threading.Thread.Sleep(5000);}}private?void?PutJob(){???????????while?(true){try{//先搜索等待轉碼的var?fileList?=?DocResourceRegisterProxy.GetFileList(new?int[]?{?(int)FileToImgStateEnum.Wait?},?30,?lastResId);Log.Error($"拉取待轉碼記錄===總數:lastResId:{lastResId},結果:{fileList?.Count()????0}");if?(fileList?==?null?||?!fileList.Any()){lastResId?=?null;Log.Error($"待轉碼數量為0,開始拉取轉碼失敗記錄,重新轉碼==========");//如果無待轉,則把出錯的?嘗試fileList?=?DocResourceRegisterProxy.GetFileList(new?int[]?{?(int)FileToImgStateEnum.Error,?(int)FileToImgStateEnum.TimeOut,?(int)FileToImgStateEnum.Fail?},?1,?lastErrorResId);if?(fileList?==?null?||?!fileList.Any()){lastErrorResId?=?null;}else{// Log.Error($"開始轉碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");List<DocResourceRegister>?errFilter?=?new?List<DocResourceRegister>();foreach?(var?errRes?in?fileList){if?(ResErrNumber.TryGetValue(errRes.res_id,?out?int?number)){if?(number?>?MaxErrNumber){Log.Error($"資源:{errRes.res_id}?轉了{MaxErrNumber}次不成功,放棄===========");continue;}else{errFilter.Add(errRes);ResErrNumber[errRes.res_id]?=?number?+?1;}}else{ResErrNumber.Add(errRes.res_id,?1);errFilter.Add(errRes);}}fileList?=?errFilter;if?(fileList.Any()){lastErrorResId?=?fileList.Select(s?=>?s.res_id).Max();}}}else{lastResId?=?fileList.Select(s?=>?s.res_id).Max();}if?(fileList?!=?null?&&?fileList.Any()){foreach?(var?file?in?fileList){//如果?任務投放線程池失敗,則等待一面繼續投放int?poolRet?=?0;while?(poolRet?<=?0){poolRet?=?pool.AddTask(s?=>?{AliFileService.ConvertToImg(file.res_id?+?$".{file.res_ext}",?FileToImgFac.Instance(file.res_ext));},?file,?file.res_id);if?(poolRet?<=?0?||?poolRet?>?1){Log.Error($"發放轉碼任務失敗==========線程池返回結果:{poolRet}");System.Threading.Thread.Sleep(1000);}}}}//每一秒去數據庫取一次數據System.Threading.Thread.Sleep(3000);}catch{continue;}}}以上就是發放任務,線程池執行任務的所有代碼,由于具體的轉碼代碼涉及到隱私,這里不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優的方式,但是我覺得線程池這樣的思想可能會對部分人有幫助,其中任務超時的核心代碼如下(采用了polly插件):
var?policy=?Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut),?onTimeout:?(context,?timespan,?task)?=>{ret.State=Enum.FileToImgStateEnum.TimeOut;???????????????????});policy.Execute(s=>{.....});把你的更優方案寫在留言區吧,2020年大家越來越好
●程序員修神之路--打通Docker鏡像發布容器運行流程
●程序員修神之路--容器技術為什么會這么流行(記得去抽獎)
●程序員修神之路--kubernetes是微服務發展的必然產物
●程序員過關斬將--要想獲取我的用戶信息,就得按照規矩來
●程序員過關斬將--更加優雅的Token認證方式JWT
●程序員過關斬將--cookie和session的關系其實很簡單
●程序員修神之路--用NOSql給高并發系統加速
●程序員修神之路--高并發系統設計負載均衡架構
●程序員修神之路--做好分庫分表其實很難之一(繼續送書)
●程序員修神之路--做好分庫分表其實很難之二(送書繼續)
●程序員過關斬將--你為什么還在用存儲過程?
●程序員過關斬將--小小的分頁引發的加班血案
●程序員修神之路--問世間異步為何物?
●程序員修神之路--提高網站的吞吐量????
總結
以上是生活随笔為你收集整理的程序员过关斩将--自定义线程池来实现文档转码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dotNetCore操作Redis(含C
- 下一篇: Xamarin.Forms弹出对话框插件