日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

Java并发编程实战~生产者-消费者模式

發(fā)布時(shí)間:2024/7/23 java 48 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java并发编程实战~生产者-消费者模式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前面我們?cè)凇禬orker Thread 模式》中講到,Worker Thread 模式類比的是工廠里車間工人的工作模式。但其實(shí)在現(xiàn)實(shí)世界,工廠里還有一種流水線的工作模式,類比到編程領(lǐng)域,就是生產(chǎn)者 - 消費(fèi)者模式。

生產(chǎn)者 - 消費(fèi)者模式在編程領(lǐng)域的應(yīng)用也非常廣泛,前面我們?cè)?jīng)提到,Java 線程池本質(zhì)上就是用生產(chǎn)者 - 消費(fèi)者模式實(shí)現(xiàn)的,所以每當(dāng)使用線程池的時(shí)候,其實(shí)就是在應(yīng)用生產(chǎn)者 - 消費(fèi)者模式。

當(dāng)然,除了在線程池中的應(yīng)用,為了提升性能,并發(fā)編程領(lǐng)域很多地方也都用到了生產(chǎn)者 - 消費(fèi)者模式,例如 Log4j2 中異步 Appender 內(nèi)部也用到了生產(chǎn)者 - 消費(fèi)者模式。所以今天我們就來深入地聊聊生產(chǎn)者 - 消費(fèi)者模式,看看它具體有哪些優(yōu)點(diǎn),以及如何提升系統(tǒng)的性能。

生產(chǎn)者 - 消費(fèi)者模式的優(yōu)點(diǎn)

生產(chǎn)者 - 消費(fèi)者模式的核心是一個(gè)任務(wù)隊(duì)列,生產(chǎn)者線程生產(chǎn)任務(wù),并將任務(wù)添加到任務(wù)隊(duì)列中,而消費(fèi)者線程從任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。下面是生產(chǎn)者 - 消費(fèi)者模式的一個(gè)示意圖,你可以結(jié)合它來理解。

從架構(gòu)設(shè)計(jì)的角度來看,生產(chǎn)者 - 消費(fèi)者模式有一個(gè)很重要的優(yōu)點(diǎn),就是解耦。解耦對(duì)于大型系統(tǒng)的設(shè)計(jì)非常重要,而解耦的一個(gè)關(guān)鍵就是組件之間的依賴關(guān)系和通信方式必須受限。在生產(chǎn)者 - 消費(fèi)者模式中,生產(chǎn)者和消費(fèi)者沒有任何依賴關(guān)系,它們彼此之間的通信只能通過任務(wù)隊(duì)列,所以生產(chǎn)者 - 消費(fèi)者模式是一個(gè)不錯(cuò)的解耦方案。

除了架構(gòu)設(shè)計(jì)上的優(yōu)點(diǎn)之外,生產(chǎn)者 - 消費(fèi)者模式還有一個(gè)重要的優(yōu)點(diǎn)就是支持異步,并且能夠平衡生產(chǎn)者和消費(fèi)者的速度差異。在生產(chǎn)者 - 消費(fèi)者模式中,生產(chǎn)者線程只需要將任務(wù)添加到任務(wù)隊(duì)列而無(wú)需等待任務(wù)被消費(fèi)者線程執(zhí)行完,也就是說任務(wù)的生產(chǎn)和消費(fèi)是異步的,這是與傳統(tǒng)的方法之間調(diào)用的本質(zhì)區(qū)別,傳統(tǒng)的方法之間調(diào)用是同步的。

你或許會(huì)有這樣的疑問,異步化處理最簡(jiǎn)單的方式就是創(chuàng)建一個(gè)新的線程去處理,那中間增加一個(gè)“任務(wù)隊(duì)列”究竟有什么用呢?我覺得主要還是用于平衡生產(chǎn)者和消費(fèi)者的速度差異。我們假設(shè)生產(chǎn)者的速率很慢,而消費(fèi)者的速率很高,比如是 1:3,如果生產(chǎn)者有 3 個(gè)線程,采用創(chuàng)建新的線程的方式,那么會(huì)創(chuàng)建 3 個(gè)子線程,而采用生產(chǎn)者 - 消費(fèi)者模式,消費(fèi)線程只需要 1 個(gè)就可以了。Java 語(yǔ)言里,Java 線程和操作系統(tǒng)線程是一一對(duì)應(yīng)的,線程創(chuàng)建得太多,會(huì)增加上下文切換的成本,所以 Java 線程不是越多越好,適量即可。生產(chǎn)者 - 消費(fèi)者模式恰好能支持你用適量的線程

支持批量執(zhí)行以提升性能

前面我們?cè)凇禩hread-Per-Message 模式》中講過輕量級(jí)的線程,如果使用輕量級(jí)線程,就沒有必要平衡生產(chǎn)者和消費(fèi)者的速度差異了,因?yàn)檩p量級(jí)線程本身就是廉價(jià)的,那是否意味著生產(chǎn)者 - 消費(fèi)者模式在性能優(yōu)化方面就無(wú)用武之地了呢?當(dāng)然不是,有一類并發(fā)場(chǎng)景應(yīng)用生產(chǎn)者 - 消費(fèi)者模式就有奇效,那就是批量執(zhí)行任務(wù)。

例如,我們要在數(shù)據(jù)庫(kù)里 INSERT 1000 條數(shù)據(jù),有兩種方案:第一種方案是用 1000 個(gè)線程并發(fā)執(zhí)行,每個(gè)線程 INSERT 一條數(shù)據(jù);第二種方案是用 1 個(gè)線程,執(zhí)行一個(gè)批量的 SQL,一次性把 1000 條數(shù)據(jù) INSERT 進(jìn)去。這兩種方案,顯然是第二種方案效率更高,其實(shí)這樣的應(yīng)用場(chǎng)景就是我們上面提到的批量執(zhí)行場(chǎng)景。

在《兩階段終止模式》文章中,我們提到一個(gè)監(jiān)控系統(tǒng)動(dòng)態(tài)采集的案例,其實(shí)最終回傳的監(jiān)控?cái)?shù)據(jù)還是要存入數(shù)據(jù)庫(kù)的(如下圖)。但被監(jiān)控系統(tǒng)往往有很多,如果每一條回傳數(shù)據(jù)都直接 INSERT 到數(shù)據(jù)庫(kù),那么這個(gè)方案就是上面提到的第一種方案:每個(gè)線程 INSERT 一條數(shù)據(jù)。很顯然,更好的方案是批量執(zhí)行 SQL,那如何實(shí)現(xiàn)呢?這就要用到生產(chǎn)者 - 消費(fèi)者模式了。

利用生產(chǎn)者 - 消費(fèi)者模式實(shí)現(xiàn)批量執(zhí)行 SQL 非常簡(jiǎn)單:將原來直接 INSERT 數(shù)據(jù)到數(shù)據(jù)庫(kù)的線程作為生產(chǎn)者線程,生產(chǎn)者線程只需將數(shù)據(jù)添加到任務(wù)隊(duì)列,然后消費(fèi)者線程負(fù)責(zé)將任務(wù)從任務(wù)隊(duì)列中批量取出并批量執(zhí)行。

在下面的示例代碼中,我們創(chuàng)建了 5 個(gè)消費(fèi)者線程負(fù)責(zé)批量執(zhí)行 SQL,這 5 個(gè)消費(fèi)者線程以 while(true){} 循環(huán)方式批量地獲取任務(wù)并批量地執(zhí)行。需要注意的是,從任務(wù)隊(duì)列中獲取批量任務(wù)的方法 pollTasks() 中,首先是以阻塞方式獲取任務(wù)隊(duì)列中的一條任務(wù),而后則是以非阻塞的方式獲取任務(wù);之所以首先采用阻塞方式,是因?yàn)槿绻蝿?wù)隊(duì)列中沒有任務(wù),這樣的方式能夠避免無(wú)謂的循環(huán)。

//任務(wù)隊(duì)列 BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);//啟動(dòng)5個(gè)消費(fèi)者線程 //執(zhí)行批量任務(wù) public void start() {ExecutorService es=executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {es.execute(()->{try {while (true) {//獲取批量任務(wù)List<Task> ts = pollTasks();//執(zhí)行批量任務(wù)execTasks(ts);}} catch (Exception e) {e.printStackTrace();}});} }//從任務(wù)隊(duì)列中獲取批量任務(wù) public List<Task> pollTasks() throws InterruptedException {List<Task> ts = new LinkedList<>();//阻塞式獲取一條任務(wù)Task t = bq.take();while (t != null) {ts.add(t);//非阻塞式獲取一條任務(wù)t = bq.poll();}return ts; }//批量執(zhí)行任務(wù) public execTasks(List<Task> ts) {//省略具體代碼無(wú)數(shù) }

利用生產(chǎn)者 - 消費(fèi)者模式還可以輕松地支持一種分階段提交的應(yīng)用場(chǎng)景。我們知道寫文件如果同步刷盤性能會(huì)很慢,所以對(duì)于不是很重要的數(shù)據(jù),我們往往采用異步刷盤的方式。我曾經(jīng)參與過一個(gè)項(xiàng)目,其中的日志組件是自己實(shí)現(xiàn)的,采用的就是異步刷盤方式,刷盤的時(shí)機(jī)是:

  • ERROR 級(jí)別的日志需要立即刷盤;
  • 數(shù)據(jù)積累到 500 條需要立即刷盤;
  • 存在未刷盤數(shù)據(jù),且 5 秒鐘內(nèi)未曾刷盤,需要立即刷盤。

這個(gè)日志組件的異步刷盤操作本質(zhì)上其實(shí)就是一種分階段提交。下面我們具體看看用生產(chǎn)者 - 消費(fèi)者模式如何實(shí)現(xiàn)。在下面的示例代碼中,可以通過調(diào)用 info()和error() 方法寫入日志,這兩個(gè)方法都是創(chuàng)建了一個(gè)日志任務(wù) LogMsg,并添加到阻塞隊(duì)列中,調(diào)用 info()和error() 方法的線程是生產(chǎn)者;而真正將日志寫入文件的是消費(fèi)者線程,在 Logger 這個(gè)類中,我們只創(chuàng)建了 1 個(gè)消費(fèi)者線程,在這個(gè)消費(fèi)者線程中,會(huì)根據(jù)刷盤規(guī)則執(zhí)行刷盤操作,邏輯很簡(jiǎn)單,這里就不贅述了。

class Logger {//任務(wù)隊(duì)列 final BlockingQueue<LogMsg> bq = new BlockingQueue<>();//flush批量 static final int batchSize=500;//只需要一個(gè)線程寫日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動(dòng)寫日志線程public void start(){File file = File.createTempFile("foo", ".log");final FileWriter writer = new FileWriter(file);this.es.execute(()->{try {//未刷盤日志數(shù)量int curIdx = 0;long preFT = System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//寫日志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盤數(shù)據(jù),則無(wú)需刷盤if (curIdx <= 0) {continue;}//根據(jù)規(guī)則刷盤if (log!=null && log.level==LEVEL.ERROR ||curIdx == batchSize ||System.currentTimeMillis()-preFT>5000){writer.flush();curIdx = 0;preFT=System.currentTimeMillis();}}}catch(Exception e){e.printStackTrace();} finally {try {writer.flush();writer.close();}catch(IOException e){e.printStackTrace();}}}); }//寫INFO級(jí)別日志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//寫ERROR級(jí)別日志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));} } //日志級(jí)別 enum LEVEL {INFO, ERROR } class LogMsg {LEVEL level;String msg;//省略構(gòu)造函數(shù)實(shí)現(xiàn)LogMsg(LEVEL lvl, String msg){}//省略toString()實(shí)現(xiàn)String toString(){} }

總結(jié)

Java 語(yǔ)言提供的線程池本身就是一種生產(chǎn)者 - 消費(fèi)者模式的實(shí)現(xiàn),但是線程池中的線程每次只能從任務(wù)隊(duì)列中消費(fèi)一個(gè)任務(wù)來執(zhí)行,對(duì)于大部分并發(fā)場(chǎng)景這種策略都沒有問題。但是有些場(chǎng)景還是需要自己來實(shí)現(xiàn),例如需要批量執(zhí)行以及分階段提交的場(chǎng)景。

生產(chǎn)者 - 消費(fèi)者模式在分布式計(jì)算中的應(yīng)用也非常廣泛。在分布式場(chǎng)景下,你可以借助分布式消息隊(duì)列(MQ)來實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者模式。MQ 一般都會(huì)支持兩種消息模型,一種是點(diǎn)對(duì)點(diǎn)模型,一種是發(fā)布訂閱模型。這兩種模型的區(qū)別在于,點(diǎn)對(duì)點(diǎn)模型里一個(gè)消息只會(huì)被一個(gè)消費(fèi)者消費(fèi),和 Java 的線程池非常類似(Java 線程池的任務(wù)也只會(huì)被一個(gè)線程執(zhí)行);而發(fā)布訂閱模型里一個(gè)消息會(huì)被多個(gè)消費(fèi)者消費(fèi),本質(zhì)上是一種消息的廣播,在多線程編程領(lǐng)域,你可以結(jié)合觀察者模式實(shí)現(xiàn)廣播功能。

總結(jié)

以上是生活随笔為你收集整理的Java并发编程实战~生产者-消费者模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。