Java:使用Executors创建和管理线程
http://zhangjunhd.blog.51cto.com/113473/70068/
1. 類 Executors 此類中提供的一些方法有: 1.1 public static ExecutorService newCachedThreadPool() 創(chuàng)建一個(gè)可根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,但是在以前構(gòu)造的線程可用時(shí)將重用它們。對于執(zhí)行很多短期異步任務(wù)的程序而言,這些線程池通常可提高程序性能。 1.2 public static ExecutorService newFixedThreadPool(int?nThreads) 創(chuàng)建一個(gè)可重用固定線程數(shù)的線程池,以共享的無界隊(duì)列方式來運(yùn)行這些線程。 1.3 public static ExecutorService newSingleThreadExecutor() 創(chuàng)建一個(gè)使用單個(gè) worker 線程的 Executor,以無界隊(duì)列方式來運(yùn)行該線程。 這三個(gè)方法都可以配合接口ThreadFactory的實(shí)例一起使用。并且返回一個(gè)ExecutorService接口的實(shí)例。 2. 接口 ThreadFactory 根據(jù)需要?jiǎng)?chuàng)建新線程的對象。使用線程工廠就無需再手工編寫對 new Thread 的調(diào)用了,從而允許應(yīng)用程序使用特殊的線程子類、屬性等等。 此接口最簡單的實(shí)現(xiàn)就是:
| class SimpleThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { return new Thread(r); } } |
| package com.zj.concurrency.executors; publicclass MyThread implements Runnable { ??? privateintcount = 1, number; ??? public MyThread(int num) { ?????? number = num; ?????? System.out.println("Create Thread-" + number); ??? } ??? publicvoid run() { ?????? while (true) { ?????????? System.out.println("Thread-" + number + " run " + count+" time(s)"); ?????????? if (++count == 3) ????????????? return; ?????? } ??? } } |
| package com.zj.concurrency.executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; publicclass CachedThreadPool { ??? publicstaticvoid main(String[] args) { ?????? ExecutorService exec = Executors.newCachedThreadPool(); ?????? for (int i = 0; i < 5; i++) ?????????? exec.execute(new MyThread(i)); ?????? exec.shutdown(); ??? } } |
| package com.zj.concurrency.executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; publicclass FixedThreadPool { ??? publicstaticvoid main(String[] args) { ?????? ExecutorService exec = Executors.newFixedThreadPool(2); ?????? for (int i = 0; i < 5; i++) ?????????? exec.execute(new MyThread(i)); ?????? exec.shutdown(); ??? } } |
| package com.zj.concurrency.executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; publicclass SingleThreadExecutor { ??? publicstaticvoid main(String[] args) { ?????? ExecutorService exec = Executors.newSingleThreadExecutor(); ?????? for (int i = 0; i < 5; i++) ?????????? exec.execute(new MyThread(i)); ?????? exec.shutdown(); ??? } } |
| package com.zj.concurrency.executors.factory; import java.util.concurrent.ThreadFactory; publicclass DaemonThreadFactory implements ThreadFactory { ??? public Thread newThread(Runnable r) { ?????? Thread t = new Thread(r); ?????? t.setDaemon(true); ?????? return t; ??? } } |
| package com.zj.concurrency.executors.factory; import java.util.concurrent.ThreadFactory; publicclass MaxPriorityThreadFactory implements ThreadFactory { ??? public Thread newThread(Runnable r) { ?????? Thread t = new Thread(r); ?????? t.setPriority(Thread.MAX_PRIORITY); ?????? return t; ??? } } |
| package com.zj.concurrency.executors.factory; import java.util.concurrent.ThreadFactory; publicclass MinPriorityThreadFactory implements ThreadFactory { ??? public Thread newThread(Runnable r) { ?????? Thread t = new Thread(r); ?????? t.setPriority(Thread.MIN_PRIORITY); ?????? return t; ??? } } |
| package com.zj.concurrency.executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.zj.concurrency.executors.factory.DaemonThreadFactory; import com.zj.concurrency.executors.factory.MaxPriorityThreadFactory; import com.zj.concurrency.executors.factory.MinPriorityThreadFactory; publicclass ExecFromFactory { ??? publicstaticvoid main(String[] args) throws Exception { ?????? ExecutorService defaultExec = Executors.newCachedThreadPool(); ?????? ExecutorService daemonExec = Executors ????????????? .newCachedThreadPool(new DaemonThreadFactory()); ?????? ExecutorService maxPriorityExec = Executors ????????????? .newCachedThreadPool(new MaxPriorityThreadFactory()); ?????? ExecutorService minPriorityExec = Executors ????????????? .newCachedThreadPool(new MinPriorityThreadFactory()); ?????? for (int i = 0; i < 10; i++) ?????????? daemonExec.execute(new MyThread(i)); ?????? for (int i = 10; i < 20; i++) ?????????? if (i == 10) ????????????? maxPriorityExec.execute(new MyThread(i)); ?????????? elseif (i == 11) ????????????? minPriorityExec.execute(new MyThread(i)); ?????????? else ?????? ?????? defaultExec.execute(new MyThread(i)); ??? } } |
?
?
?
==============================================================
tranditional thread pool
http://www.blogjava.net/standlww/archive/2008/10/17/235100.html
Java 線程池的原理與實(shí)現(xiàn)最近在學(xué)習(xí)線程池、內(nèi)存控制等關(guān)于提高程序運(yùn)行性能方面的編程技術(shù),在網(wǎng)上看到有一哥們寫得不錯(cuò),故和大家一起分享。
[分享]Java 線程池的原理與實(shí)現(xiàn)
這幾天主要是狂看源程序,在彌補(bǔ)了一些以前知識空白的同時(shí),也學(xué)會了不少新的知識(比如 NIO),或者稱為新技術(shù)吧。
線程池就是其中之一,一提到線程,我們會想到以前《操作系統(tǒng)》的生產(chǎn)者與消費(fèi)者,信號量,同步控制等等。
一提到池,我們會想到數(shù)據(jù)庫連接池,但是線程池又如何呢?
建議:在閱讀本文前,先理一理同步的知識,特別是syncronized同步關(guān)鍵字的用法。
關(guān)于我對同步的認(rèn)識,要緣于大三年的一本書,書名好像是 Java 實(shí)戰(zhàn),這本書寫得實(shí)在太妙了,真正的從理論到實(shí)踐,從截圖分析到.class字節(jié)碼分析。哇,我想市場上很難買到這么精致的書了。作為一個(gè)Java愛好者,我覺得絕對值得一讀。
我對此書印象最深之一的就是:equal()方法,由淺入深,經(jīng)典!
還有就是同步了,其中提到了我的幾個(gè)編程誤區(qū),以前如何使用同步提高性能等等,通過學(xué)習(xí),使我對同步的認(rèn)識進(jìn)一步加深。
簡單介紹
??? 創(chuàng)建線程有兩種方式:繼承Thread或?qū)崿F(xiàn)Runnable。Thread實(shí)現(xiàn)了Runnable接口,提供了一個(gè)空的run()方法,所以不論是繼承Thread還是實(shí)現(xiàn)Runnable,都要有自己的run()方法。
??? 一個(gè)線程創(chuàng)建后就存在,調(diào)用start()方法就開始運(yùn)行(執(zhí)行run()方法),調(diào)用wait進(jìn)入等待或調(diào)用sleep進(jìn)入休眠期,順利運(yùn)行完畢或休眠被中斷或運(yùn)行過程中出現(xiàn)異常而退出。
wait和sleep比較:
????? sleep方法有:sleep(long millis),sleep(long millis, long nanos),調(diào)用sleep方法后,當(dāng)前線程進(jìn)入休眠期,暫停執(zhí)行,但該線程繼續(xù)擁有監(jiān)視資源的所有權(quán)。到達(dá)休眠時(shí)間后線程將繼續(xù)執(zhí)行,直到完成。若在休眠期另一線程中斷該線程,則該線程退出。
????? wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),調(diào)用wait方法后,該線程放棄監(jiān)視資源的所有權(quán)進(jìn)入等待狀態(tài);
????? wait():等待有其它的線程調(diào)用notify()或notifyAll()進(jìn)入調(diào)度狀態(tài),與其它線程共同爭奪監(jiān)視。wait()相當(dāng)于wait(0),wait(0, 0)。
????? wait(long timeout):當(dāng)其它線程調(diào)用notify()或notifyAll(),或時(shí)間到達(dá)timeout亳秒,或有其它某線程中斷該線程,則該線程進(jìn)入調(diào)度狀態(tài)。
????? wait(long timeout, long nanos):相當(dāng)于wait(1000000*timeout + nanos),只不過時(shí)間單位為納秒。
線程池:
??? 多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問題,它可以顯著減少處理器單元的閑置時(shí)間,增加處理器單元的吞吐能力。
???
??? 假設(shè)一個(gè)服務(wù)器完成一項(xiàng)任務(wù)所需時(shí)間為:T1 創(chuàng)建線程時(shí)間,T2 在線程中執(zhí)行任務(wù)的時(shí)間,T3 銷毀線程時(shí)間。
???
??? 如果:T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能。
??? ??? ??? ??? 一個(gè)線程池包括以下四個(gè)基本組成部分:
??? ??? ??? ??? 1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池,包括 創(chuàng)建線程池,銷毀線程池,添加新任務(wù);
??? ??? ??? ??? 2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時(shí)處于等待狀態(tài),可以循環(huán)的執(zhí)行任務(wù);
??? ??? ??? ??? 3、任務(wù)接口(Task):每個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的執(zhí)行,它主要規(guī)定了任務(wù)的入口,任務(wù)執(zhí)行完后的收尾工作,任務(wù)的執(zhí)行狀態(tài)等;
??? ??? ??? ??? 4、任務(wù)隊(duì)列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機(jī)制。
??? ??? ??? ???
??? 線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時(shí)間的技術(shù),從而提高服務(wù)器程序性能的。它把T1,T3分別安排在服務(wù)器程序的啟動(dòng)和結(jié)束的時(shí)間段或者一些空閑的時(shí)間段,這樣在服務(wù)器程序處理客戶請求時(shí),不會有T1,T3的開銷了。
??? 線程池不僅調(diào)整T1,T3產(chǎn)生的時(shí)間段,而且它還顯著減少了創(chuàng)建線程的數(shù)目,看一個(gè)例子:
??? 假設(shè)一個(gè)服務(wù)器一天要處理50000個(gè)請求,并且每個(gè)請求需要一個(gè)單獨(dú)的線程完成。在線程池中,線程數(shù)一般是固定的,所以產(chǎn)生線程總數(shù)不會超過線程池中線程的數(shù)目,而如果服務(wù)器不利用線程池來處理這些請求則線程總數(shù)為50000。一般線程池大小是遠(yuǎn)小于50000。所以利用線程池的服務(wù)器程序不會為了創(chuàng)建50000而在處理請求時(shí)浪費(fèi)時(shí)間,從而提高效率。
/** 線程池類,工作線程作為其內(nèi)部類 **/
package org.ymcn.util;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
/**
* 線程池
* 創(chuàng)建線程池,銷毀線程池,添加新任務(wù)
*
* @author obullxl
*/
public final class ThreadPool {
??? private static Logger logger = Logger.getLogger(ThreadPool.class);
??? private static Logger taskLogger = Logger.getLogger("TaskLogger");
??? private static boolean debug = taskLogger.isDebugEnabled();
??? // private static boolean debug = taskLogger.isInfoEnabled();
??? /* 單例 */
??? private static ThreadPool instance = ThreadPool.getInstance();
??? public static final int SYSTEM_BUSY_TASK_COUNT = 150;
??? /* 默認(rèn)池中線程數(shù) */
??? public static int worker_num = 5;
??? /* 已經(jīng)處理的任務(wù)數(shù) */
??? private static int taskCounter = 0;
??? public static boolean systemIsBusy = false;
??? private static List<Task> taskQueue = Collections
??? ??? ??? .synchronizedList(new LinkedList<Task>());
??? /* 池中的所有線程 */
??? public PoolWorker[] workers;
??? private ThreadPool() {
??? ??? workers = new PoolWorker[5];
??? ??? for (int i = 0; i < workers.length; i++) {
??? ??? ??? workers[i] = new PoolWorker(i);
??? ??? }
??? }
??? private ThreadPool(int pool_worker_num) {
??? ??? worker_num = pool_worker_num;
??? ??? workers = new PoolWorker[worker_num];
??? ??? for (int i = 0; i < workers.length; i++) {
??? ??? ??? workers[i] = new PoolWorker(i);
??? ??? }
??? }
??? public static synchronized ThreadPool getInstance() {
??? ??? if (instance == null)
??? ??? ??? return new ThreadPool();
??? ??? return instance;
??? }
??? /**
??? * 增加新的任務(wù)
??? * 每增加一個(gè)新任務(wù),都要喚醒任務(wù)隊(duì)列
??? * @param newTask
??? */
??? public void addTask(Task newTask) {
??? ??? synchronized (taskQueue) {
??? ??? ??? newTask.setTaskId(++taskCounter);
??? ??? ??? newTask.setSubmitTime(new Date());
??? ??? ??? taskQueue.add(newTask);
??? ??? ??? /* 喚醒隊(duì)列, 開始執(zhí)行 */
??? ??? ??? taskQueue.notifyAll();
??? ??? }
??? ??? logger.info("Submit Task<" + newTask.getTaskId() + ">: "
??? ??? ??? ??? + newTask.info());
??? }
??? /**
??? * 批量增加新任務(wù)
??? * @param taskes
??? */
??? public void batchAddTask(Task[] taskes) {
??? ??? if (taskes == null || taskes.length == 0) {
??? ??? ??? return;
??? ??? }
??? ??? synchronized (taskQueue) {
??? ??? ??? for (int i = 0; i < taskes.length; i++) {
??? ??? ??? ??? if (taskes[i] == null) {
??? ??? ??? ??? ??? continue;
??? ??? ??? ??? }
??? ??? ??? ??? taskes[i].setTaskId(++taskCounter);
??? ??? ??? ??? taskes[i].setSubmitTime(new Date());
??? ??? ??? ??? taskQueue.add(taskes[i]);
??? ??? ??? }
??? ??? ??? /* 喚醒隊(duì)列, 開始執(zhí)行 */
??? ??? ??? taskQueue.notifyAll();
??? ??? }
??? ??? for (int i = 0; i < taskes.length; i++) {
??? ??? ??? if (taskes[i] == null) {
??? ??? ??? ??? continue;
??? ??? ??? }
??? ??? ??? logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
??? ??? ??? ??? ??? + taskes[i].info());
??? ??? }
??? }
??? /**
??? * 線程池信息
??? * @return
??? */
??? public String getInfo() {
??? ??? StringBuffer sb = new StringBuffer();
??? ??? sb.append("\nTask Queue Size:" + taskQueue.size());
??? ??? for (int i = 0; i < workers.length; i++) {
??? ??? ??? sb.append("\nWorker " + i + " is "
??? ??? ??? ??? ??? + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
??? ??? }
??? ??? return sb.toString();
??? }
??? /**
??? * 銷毀線程池
??? */
??? public synchronized void destroy() {
??? ??? for (int i = 0; i < worker_num; i++) {
??? ??? ??? workers[i].stopWorker();
??? ??? ??? workers[i] = null;
??? ??? }
??? ??? taskQueue.clear();
??? }
??? /**
??? * 池中工作線程
??? *
??? * @author obullxl
??? */
??? private class PoolWorker extends Thread {
??? ??? private int index = -1;
??? ??? /* 該工作線程是否有效 */
??? ??? private boolean isRunning = true;
??? ??? /* 該工作線程是否可以執(zhí)行新任務(wù) */
??? ??? private boolean isWaiting = true;
??? ??? public PoolWorker(int index) {
??? ??? ??? this.index = index;
??? ??? ??? start();
??? ??? }
??? ??? public void stopWorker() {
??? ??? ??? this.isRunning = false;
??? ??? }
??? ??? public boolean isWaiting() {
??? ??? ??? return this.isWaiting;
??? ??? }
??? ??? /**
??? ??? * 循環(huán)執(zhí)行任務(wù)
??? ??? * 這也許是線程池的關(guān)鍵所在
??? ??? */
??? ??? public void run() {
??? ??? ??? while (isRunning) {
??? ??? ??? ??? Task r = null;
??? ??? ??? ??? synchronized (taskQueue) {
??? ??? ??? ??? ??? while (taskQueue.isEmpty()) {
??? ??? ??? ??? ??? ??? try {
??? ??? ??? ??? ??? ??? ??? /* 任務(wù)隊(duì)列為空,則等待有新任務(wù)加入從而被喚醒 */
??? ??? ??? ??? ??? ??? ??? taskQueue.wait(20);
??? ??? ??? ??? ??? ??? } catch (InterruptedException ie) {
??? ??? ??? ??? ??? ??? ??? logger.error(ie);
??? ??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? /* 取出任務(wù)執(zhí)行 */
??? ??? ??? ??? ??? r = (Task) taskQueue.remove(0);
??? ??? ??? ??? }
??? ??? ??? ??? if (r != null) {
??? ??? ??? ??? ??? isWaiting = false;
??? ??? ??? ??? ??? try {
??? ??? ??? ??? ??? ??? if (debug) {
??? ??? ??? ??? ??? ??? ??? r.setBeginExceuteTime(new Date());
??? ??? ??? ??? ??? ??? ??? taskLogger.debug("Worker<" + index
??? ??? ??? ??? ??? ??? ??? ??? ??? + "> start execute Task<" + r.getTaskId() + ">");
??? ??? ??? ??? ??? ??? ??? if (r.getBeginExceuteTime().getTime()
??? ??? ??? ??? ??? ??? ??? ??? ??? - r.getSubmitTime().getTime() > 1000)
??? ??? ??? ??? ??? ??? ??? ??? taskLogger.debug("longer waiting time. "
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? + r.info() + ",<" + index + ">,time:"
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? + (r.getFinishTime().getTime() - r
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? .getBeginExceuteTime().getTime()));
??? ??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? ??? /* 該任務(wù)是否需要立即執(zhí)行 */
??? ??? ??? ??? ??? ??? if (r.needExecuteImmediate()) {
??? ??? ??? ??? ??? ??? ??? new Thread(r).start();
??? ??? ??? ??? ??? ??? } else {
??? ??? ??? ??? ??? ??? ??? r.run();
??? ??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? ??? if (debug) {
??? ??? ??? ??? ??? ??? ??? r.setFinishTime(new Date());
??? ??? ??? ??? ??? ??? ??? taskLogger.debug("Worker<" + index
??? ??? ??? ??? ??? ??? ??? ??? ??? + "> finish task<" + r.getTaskId() + ">");
??? ??? ??? ??? ??? ??? ??? if (r.getFinishTime().getTime()
??? ??? ??? ??? ??? ??? ??? ??? ??? - r.getBeginExceuteTime().getTime() > 1000)
??? ??? ??? ??? ??? ??? ??? ??? taskLogger.debug("longer execution time. "
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? + r.info() + ",<" + index + ">,time:"
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? + (r.getFinishTime().getTime() - r
??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? .getBeginExceuteTime().getTime()));
??? ??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? } catch (Exception e) {
??? ??? ??? ??? ??? ??? e.printStackTrace();
??? ??? ??? ??? ??? ??? logger.error(e);
??? ??? ??? ??? ??? }
??? ??? ??? ??? ??? isWaiting = true;
??? ??? ??? ??? ??? r = null;
??? ??? ??? ??? }
??? ??? ??? }
??? ??? }
??? }
}
/** 任務(wù)接口類 **/
package org.ymcn.util;
import java.util.Date;
/**
* 所有任務(wù)接口
* 其他任務(wù)必須繼承訪類
*
* @author obullxl
*/
public abstract class Task implements Runnable {
??? // private static Logger logger = Logger.getLogger(Task.class);
??? /* 產(chǎn)生時(shí)間 */
??? private Date generateTime = null;
??? /* 提交執(zhí)行時(shí)間 */
??? private Date submitTime = null;
??? /* 開始執(zhí)行時(shí)間 */
??? private Date beginExceuteTime = null;
??? /* 執(zhí)行完成時(shí)間 */
??? private Date finishTime = null;
??? private long taskId;
??? public Task() {
??? ??? this.generateTime = new Date();
??? }
??? /**
??? * 任務(wù)執(zhí)行入口
??? */
??? public void run() {
??? ??? /**
??? ??? * 相關(guān)執(zhí)行代碼
??? ??? *
??? ??? * beginTransaction();
??? ??? *
??? ??? * 執(zhí)行過程中可能產(chǎn)生新的任務(wù) subtask = taskCore();
??? ??? *
??? ??? * commitTransaction();
??? ??? *
??? ??? * 增加新產(chǎn)生的任務(wù) ThreadPool.getInstance().batchAddTask(taskCore());
??? ??? */
??? }
??? /**
??? * 所有任務(wù)的核心 所以特別的業(yè)務(wù)邏輯執(zhí)行之處
??? *
??? * @throws Exception
??? */
??? public abstract Task[] taskCore() throws Exception;
??? /**
??? * 是否用到數(shù)據(jù)庫
??? *
??? * @return
??? */
??? protected abstract boolean useDb();
??? /**
??? * 是否需要立即執(zhí)行
??? *
??? * @return
??? */
??? protected abstract boolean needExecuteImmediate();
??? /**
??? * 任務(wù)信息
??? *
??? * @return String
??? */
??? public abstract String info();
??? public Date getGenerateTime() {
??? ??? return generateTime;
??? }
??? public Date getBeginExceuteTime() {
??? ??? return beginExceuteTime;
??? }
??? public void setBeginExceuteTime(Date beginExceuteTime) {
??? ??? this.beginExceuteTime = beginExceuteTime;
??? }
??? public Date getFinishTime() {
??? ??? return finishTime;
??? }
??? public void setFinishTime(Date finishTime) {
??? ??? this.finishTime = finishTime;
??? }
??? public Date getSubmitTime() {
??? ??? return submitTime;
??? }
??? public void setSubmitTime(Date submitTime) {
??? ??? this.submitTime = submitTime;
??? }
??? public long getTaskId() {
??? ??? return taskId;
??? }
??? public void setTaskId(long taskId) {
??? ??? this.taskId = taskId;
??? }
}
轉(zhuǎn)自:http://hi.baidu.com/obullxl/blog/item/ee50ad1ba8e8ff1f8718bf66.html
?
總結(jié)
以上是生活随笔為你收集整理的Java:使用Executors创建和管理线程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 棒球比赛规则要点
- 下一篇: CA AutoSys Workload