如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用
需求是在一個(gè)大數(shù)據(jù)量的表中按條件查詢出數(shù)據(jù)后做相應(yīng)的業(yè)務(wù)。我是使用的java線程池ThreadPoolExecutor,實(shí)現(xiàn)分批次去查詢,查詢到數(shù)據(jù)后,又分多個(gè)線程去做業(yè)務(wù)。
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize, ? long keepAliveTime,?? TimeUnit unit,?? BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 線程池維護(hù)線程的最少數(shù)量
maximumPoolSize:線程池維護(hù)線程的最大數(shù)量
keepAliveTime: 線程池維護(hù)線程所允許的空閑時(shí)間
unit: 線程池維護(hù)線程所允許的空閑時(shí)間的單位
workQueue: 線程池所使用的緩沖隊(duì)列
handler: 線程池對(duì)拒絕任務(wù)的處理策略
一個(gè)任務(wù)通過(guò) execute(Runnable)方法被添加到線程池,任務(wù)就是一個(gè) Runnable類型的對(duì)象,任務(wù)的執(zhí)行方法就是Runnable類型對(duì)象的run()方法。
線程池剛創(chuàng)建時(shí),里面沒(méi)有一個(gè)線程。任務(wù)隊(duì)列是作為參數(shù)傳進(jìn)來(lái)的。不過(guò),就算隊(duì)列里面有任務(wù),線程池也不會(huì)馬上執(zhí)行它們。當(dāng)調(diào)用 execute() 方法添加一個(gè)任務(wù)時(shí),線程池會(huì)做如下判斷:
???????? a. 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù);
b. 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize,那么將這個(gè)任務(wù)放入隊(duì)列。
c. 如果這時(shí)候隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize,那么還是要?jiǎng)?chuàng)建線程運(yùn)行這個(gè)任務(wù);
d. 如果隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會(huì)拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”。
當(dāng)一個(gè)線程完成任務(wù)時(shí),它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
當(dāng)一個(gè)線程無(wú)事可做,超過(guò)一定的時(shí)間(keepAliveTime)時(shí),線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize,那么這個(gè)線程就被停掉。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到 corePoolSize 的大小。
這樣的過(guò)程說(shuō)明,并不是先加入任務(wù)就一定會(huì)先執(zhí)行。假設(shè)隊(duì)列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那么當(dāng)加入 20 個(gè)任務(wù)時(shí),執(zhí)行的順序就是這樣的:首先執(zhí)行任務(wù) 1、2、3,然后任務(wù) 4~13 被放入隊(duì)列。這時(shí)候隊(duì)列滿了,任務(wù) 14、15、16 會(huì)被馬上執(zhí)行,而任務(wù) 17~20 則會(huì)拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
下面來(lái)看具體的代碼(代碼中會(huì)有部分代碼以+++表示不方便各位查看的):
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
??????????????? new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
??????? int pageSize = 1000; // 每次查詢交易條數(shù)
??????? int handleSize = 200; // 線程一次性處理的交易條數(shù)
??????? int handleCount = 0;
??????? int transCount = +++mapper.getTransCount(batchDate, +);//根據(jù)條件去查詢需要做業(yè)務(wù)的數(shù)據(jù)條數(shù),查詢條數(shù)的sql語(yǔ)句快
??????? logger.info(MessageFormat.format("[+++日期:[{0}], 待+++記錄條數(shù)為:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一個(gè)方法,推薦大家這么使用
??????? List<+++> tranList = null;
??????? while (handleCount < transCount)
??????? {
??????????? tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
??????????? if (tranList == null || tranList.size() == 0)
??????????? {
??????????????? logger.info(MessageFormat.format(++++
??????????????? handleCount += pageSize;
??????????????? continue;
??????????? }
??????????? int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
??????????? CountDownLatch latch = new CountDownLatch(splitCount);
??????????? for (int i = 0; i < splitCount; i++)
??????????? {
??????????????? int toIndex = (i + 1) * handleSize;
??????????????? if (i == splitCount - 1)
??????????????? {
??????????????????? toIndex = tranList.size();
??????????????? }
??????????????? List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
??????????????? threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到線程池,執(zhí)行的方法是+++Thread類中的run方法
??????????? }
??????????? handleCount += pageSize;
??????????? try
??????????? {
??????????????? latch.await(5, TimeUnit.MINUTES);
??????????? }
??????????? catch (InterruptedException e)
??????????? {
??????????????? logger.error(getClass().getName() + " doTask fail.", e);
??????????? }
??????? }
下面是threadPool.execute(new +++Thread...中的thread類
public class +++Thread implements Runnable
{
??? private Logger logger =
??? private List<NpcTransaction> trans;
??? private CountDownLatch latch;
??? private String batchDate;
??? private +++Manager
??? public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
??????????? CountDownLatch latch)
??? {
??????? this.+++Manager = +++Manager;
??????? this.trans = trans;
??????? this.batchDate = batchDate;
??????? this.latch = latch;
??? }
??? /**
???? * 重載方法
???? */
??? @Override
??? public void run()
??? {
??????? int saveCount = 0;
??????? try
??????? {
??????????? saveCount = +++Manager.save+++Record(trans);//
??????? }
??????? catch (Exception e)
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常:[{1}]", batchDate, e.getMessage()));
??????????? e.printStackTrace();
??????? }
??????? if (saveCount != trans.size())
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常,++成功條數(shù):[{1}],預(yù)期條數(shù):[{2}]", batchDate,
??????????????????? saveCount, trans.size()));
??????? }
??????? latch.countDown();
??? }
??? public List<++Transaction> getTrans()
??? {
??????? return trans;
??? }
??? public void setTrans(List<++Transaction> trans)
??? {
??????? this.trans = trans;
??? }
??? /**
???? * 獲取 latch
???? *
???? * @return 返回 latch
???? */
??? public CountDownLatch getLatch()
??? {
??????? return latch;
??? }
??? /**
???? * 設(shè)置 latch
???? *
???? * @param 對(duì)latch進(jìn)行賦值
???? */
??? public void setLatch(CountDownLatch latch)
??? {
??????? this.latch = latch;
??? }
??? /**
???? * 獲取 batchDate
???? *
???? * @return 返回 batchDate
???? */
??? public String getBatchDate()
??? {
??????? return batchDate;
??? }
??? /**
???? * 設(shè)置 batchDate
???? *
???? * @param 對(duì)batchDate進(jìn)行賦值
???? */
??? public void setBatchDate(String batchDate)
??? {
??????? this.batchDate = batchDate;
??? }
}
都要寫get,set方法,latch.countDown();這個(gè)最好寫在finally中
關(guān)于CountDownLatch這個(gè),我下面簡(jiǎn)單的說(shuō)一下:
CountDownLatch,一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
主要方法
?public CountDownLatch(int count);
?public void countDown();
?public void await() throws InterruptedException
?構(gòu)造方法參數(shù)指定了計(jì)數(shù)的次數(shù)
?countDown方法,當(dāng)前線程調(diào)用此方法,則計(jì)數(shù)減一
?awaint方法,調(diào)用此方法會(huì)一直阻塞當(dāng)前線程,直到計(jì)時(shí)器的值為0
此處用計(jì)數(shù)器,因?yàn)橐唤M1000條數(shù)據(jù)通過(guò)5個(gè)線程去執(zhí)行,這組執(zhí)行完再進(jìn)行第二組以及其后組的1000條數(shù)據(jù)操作。
總結(jié)
以上是生活随笔為你收集整理的如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 濮阳第二届创客机器人比赛_咸阳市举行第二
- 下一篇: Django后台管理之商品分类