Java并发编程实战~生产者-消费者模式
前面我們在《Worker Thread 模式》中講到,Worker Thread 模式類比的是工廠里車間工人的工作模式。但其實在現實世界,工廠里還有一種流水線的工作模式,類比到編程領域,就是生產者 - 消費者模式。
生產者 - 消費者模式在編程領域的應用也非常廣泛,前面我們曾經提到,Java 線程池本質上就是用生產者 - 消費者模式實現的,所以每當使用線程池的時候,其實就是在應用生產者 - 消費者模式。
當然,除了在線程池中的應用,為了提升性能,并發編程領域很多地方也都用到了生產者 - 消費者模式,例如 Log4j2 中異步 Appender 內部也用到了生產者 - 消費者模式。所以今天我們就來深入地聊聊生產者 - 消費者模式,看看它具體有哪些優點,以及如何提升系統的性能。
生產者 - 消費者模式的優點
生產者 - 消費者模式的核心是一個任務隊列,生產者線程生產任務,并將任務添加到任務隊列中,而消費者線程從任務隊列中獲取任務并執行。下面是生產者 - 消費者模式的一個示意圖,你可以結合它來理解。
從架構設計的角度來看,生產者 - 消費者模式有一個很重要的優點,就是解耦。解耦對于大型系統的設計非常重要,而解耦的一個關鍵就是組件之間的依賴關系和通信方式必須受限。在生產者 - 消費者模式中,生產者和消費者沒有任何依賴關系,它們彼此之間的通信只能通過任務隊列,所以生產者 - 消費者模式是一個不錯的解耦方案。
除了架構設計上的優點之外,生產者 - 消費者模式還有一個重要的優點就是支持異步,并且能夠平衡生產者和消費者的速度差異。在生產者 - 消費者模式中,生產者線程只需要將任務添加到任務隊列而無需等待任務被消費者線程執行完,也就是說任務的生產和消費是異步的,這是與傳統的方法之間調用的本質區別,傳統的方法之間調用是同步的。
你或許會有這樣的疑問,異步化處理最簡單的方式就是創建一個新的線程去處理,那中間增加一個“任務隊列”究竟有什么用呢?我覺得主要還是用于平衡生產者和消費者的速度差異。我們假設生產者的速率很慢,而消費者的速率很高,比如是 1:3,如果生產者有 3 個線程,采用創建新的線程的方式,那么會創建 3 個子線程,而采用生產者 - 消費者模式,消費線程只需要 1 個就可以了。Java 語言里,Java 線程和操作系統線程是一一對應的,線程創建得太多,會增加上下文切換的成本,所以 Java 線程不是越多越好,適量即可。生產者 - 消費者模式恰好能支持你用適量的線程。
支持批量執行以提升性能
前面我們在《Thread-Per-Message 模式》中講過輕量級的線程,如果使用輕量級線程,就沒有必要平衡生產者和消費者的速度差異了,因為輕量級線程本身就是廉價的,那是否意味著生產者 - 消費者模式在性能優化方面就無用武之地了呢?當然不是,有一類并發場景應用生產者 - 消費者模式就有奇效,那就是批量執行任務。
例如,我們要在數據庫里 INSERT 1000 條數據,有兩種方案:第一種方案是用 1000 個線程并發執行,每個線程 INSERT 一條數據;第二種方案是用 1 個線程,執行一個批量的 SQL,一次性把 1000 條數據 INSERT 進去。這兩種方案,顯然是第二種方案效率更高,其實這樣的應用場景就是我們上面提到的批量執行場景。
在《兩階段終止模式》文章中,我們提到一個監控系統動態采集的案例,其實最終回傳的監控數據還是要存入數據庫的(如下圖)。但被監控系統往往有很多,如果每一條回傳數據都直接 INSERT 到數據庫,那么這個方案就是上面提到的第一種方案:每個線程 INSERT 一條數據。很顯然,更好的方案是批量執行 SQL,那如何實現呢?這就要用到生產者 - 消費者模式了。
利用生產者 - 消費者模式實現批量執行 SQL 非常簡單:將原來直接 INSERT 數據到數據庫的線程作為生產者線程,生產者線程只需將數據添加到任務隊列,然后消費者線程負責將任務從任務隊列中批量取出并批量執行。
在下面的示例代碼中,我們創建了 5 個消費者線程負責批量執行 SQL,這 5 個消費者線程以 while(true){} 循環方式批量地獲取任務并批量地執行。需要注意的是,從任務隊列中獲取批量任務的方法 pollTasks() 中,首先是以阻塞方式獲取任務隊列中的一條任務,而后則是以非阻塞的方式獲取任務;之所以首先采用阻塞方式,是因為如果任務隊列中沒有任務,這樣的方式能夠避免無謂的循環。
//任務隊列 BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);//啟動5個消費者線程 //執行批量任務 public void start() {ExecutorService es=executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {es.execute(()->{try {while (true) {//獲取批量任務List<Task> ts = pollTasks();//執行批量任務execTasks(ts);}} catch (Exception e) {e.printStackTrace();}});} }//從任務隊列中獲取批量任務 public List<Task> pollTasks() throws InterruptedException {List<Task> ts = new LinkedList<>();//阻塞式獲取一條任務Task t = bq.take();while (t != null) {ts.add(t);//非阻塞式獲取一條任務t = bq.poll();}return ts; }//批量執行任務 public execTasks(List<Task> ts) {//省略具體代碼無數 }利用生產者 - 消費者模式還可以輕松地支持一種分階段提交的應用場景。我們知道寫文件如果同步刷盤性能會很慢,所以對于不是很重要的數據,我們往往采用異步刷盤的方式。我曾經參與過一個項目,其中的日志組件是自己實現的,采用的就是異步刷盤方式,刷盤的時機是:
- ERROR 級別的日志需要立即刷盤;
- 數據積累到 500 條需要立即刷盤;
- 存在未刷盤數據,且 5 秒鐘內未曾刷盤,需要立即刷盤。
這個日志組件的異步刷盤操作本質上其實就是一種分階段提交。下面我們具體看看用生產者 - 消費者模式如何實現。在下面的示例代碼中,可以通過調用 info()和error() 方法寫入日志,這兩個方法都是創建了一個日志任務 LogMsg,并添加到阻塞隊列中,調用 info()和error() 方法的線程是生產者;而真正將日志寫入文件的是消費者線程,在 Logger 這個類中,我們只創建了 1 個消費者線程,在這個消費者線程中,會根據刷盤規則執行刷盤操作,邏輯很簡單,這里就不贅述了。
class Logger {//任務隊列 final BlockingQueue<LogMsg> bq = new BlockingQueue<>();//flush批量 static final int batchSize=500;//只需要一個線程寫日志ExecutorService es = Executors.newFixedThreadPool(1);//啟動寫日志線程public void start(){File file = File.createTempFile("foo", ".log");final FileWriter writer = new FileWriter(file);this.es.execute(()->{try {//未刷盤日志數量int curIdx = 0;long preFT = System.currentTimeMillis();while (true) {LogMsg log = bq.poll(5, TimeUnit.SECONDS);//寫日志if (log != null) {writer.write(log.toString());++curIdx;}//如果不存在未刷盤數據,則無需刷盤if (curIdx <= 0) {continue;}//根據規則刷盤if (log!=null && log.level==LEVEL.ERROR ||curIdx == batchSize ||System.currentTimeMillis()-preFT>5000){writer.flush();curIdx = 0;preFT=System.currentTimeMillis();}}}catch(Exception e){e.printStackTrace();} finally {try {writer.flush();writer.close();}catch(IOException e){e.printStackTrace();}}}); }//寫INFO級別日志void info(String msg) {bq.put(new LogMsg(LEVEL.INFO, msg));}//寫ERROR級別日志void error(String msg) {bq.put(new LogMsg(LEVEL.ERROR, msg));} } //日志級別 enum LEVEL {INFO, ERROR } class LogMsg {LEVEL level;String msg;//省略構造函數實現LogMsg(LEVEL lvl, String msg){}//省略toString()實現String toString(){} }總結
Java 語言提供的線程池本身就是一種生產者 - 消費者模式的實現,但是線程池中的線程每次只能從任務隊列中消費一個任務來執行,對于大部分并發場景這種策略都沒有問題。但是有些場景還是需要自己來實現,例如需要批量執行以及分階段提交的場景。
生產者 - 消費者模式在分布式計算中的應用也非常廣泛。在分布式場景下,你可以借助分布式消息隊列(MQ)來實現生產者 - 消費者模式。MQ 一般都會支持兩種消息模型,一種是點對點模型,一種是發布訂閱模型。這兩種模型的區別在于,點對點模型里一個消息只會被一個消費者消費,和 Java 的線程池非常類似(Java 線程池的任務也只會被一個線程執行);而發布訂閱模型里一個消息會被多個消費者消費,本質上是一種消息的廣播,在多線程編程領域,你可以結合觀察者模式實現廣播功能。
總結
以上是生活随笔為你收集整理的Java并发编程实战~生产者-消费者模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一个简单的 iBatis 实现——完整示
- 下一篇: Java加密与解密的艺术~RSA实现