日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用

發(fā)布時(shí)間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

需求是在一個(gè)大數(shù)據(jù)量的表中按條件查詢出數(shù)據(jù)后做相應(yīng)的業(yè)務(wù)。我是使用的java線程池ThreadPoolExecutor,實(shí)現(xiàn)分批次去查詢,查詢到數(shù)據(jù)后,又分多個(gè)線程去做業(yè)務(wù)。

線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize, ? long keepAliveTime,?? TimeUnit unit,?? BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

corePoolSize: 線程池維護(hù)線程的最少數(shù)量

maximumPoolSize:線程池維護(hù)線程的最大數(shù)量

keepAliveTime: 線程池維護(hù)線程所允許的空閑時(shí)間

unit: 線程池維護(hù)線程所允許的空閑時(shí)間的單位

workQueue: 線程池所使用的緩沖隊(duì)列

handler: 線程池對(duì)拒絕任務(wù)的處理策略

一個(gè)任務(wù)通過(guò) execute(Runnable)方法被添加到線程池,任務(wù)就是一個(gè) Runnable類型的對(duì)象,任務(wù)的執(zhí)行方法就是Runnable類型對(duì)象的run()方法。

線程池剛創(chuàng)建時(shí),里面沒(méi)有一個(gè)線程。任務(wù)隊(duì)列是作為參數(shù)傳進(jìn)來(lái)的。不過(guò),就算隊(duì)列里面有任務(wù),線程池也不會(huì)馬上執(zhí)行它們。當(dāng)調(diào)用 execute() 方法添加一個(gè)任務(wù)時(shí),線程池會(huì)做如下判斷:
???????? a. 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize,那么馬上創(chuàng)建線程運(yùn)行這個(gè)任務(wù);
   b. 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize,那么將這個(gè)任務(wù)放入隊(duì)列。
   c. 如果這時(shí)候隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize,那么還是要?jiǎng)?chuàng)建線程運(yùn)行這個(gè)任務(wù);
   d. 如果隊(duì)列滿了,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池會(huì)拋出異常,告訴調(diào)用者“我不能再接受任務(wù)了”。

當(dāng)一個(gè)線程完成任務(wù)時(shí),它會(huì)從隊(duì)列中取下一個(gè)任務(wù)來(lái)執(zhí)行。
當(dāng)一個(gè)線程無(wú)事可做,超過(guò)一定的時(shí)間(keepAliveTime)時(shí),線程池會(huì)判斷,如果當(dāng)前運(yùn)行的線程數(shù)大于 corePoolSize,那么這個(gè)線程就被停掉。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到 corePoolSize 的大小。
   這樣的過(guò)程說(shuō)明,并不是先加入任務(wù)就一定會(huì)先執(zhí)行。假設(shè)隊(duì)列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那么當(dāng)加入 20 個(gè)任務(wù)時(shí),執(zhí)行的順序就是這樣的:首先執(zhí)行任務(wù) 1、2、3,然后任務(wù) 4~13 被放入隊(duì)列。這時(shí)候隊(duì)列滿了,任務(wù) 14、15、16 會(huì)被馬上執(zhí)行,而任務(wù) 17~20 則會(huì)拋出異常。最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。

下面來(lái)看具體的代碼(代碼中會(huì)有部分代碼以+++表示不方便各位查看的):

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
??????????????? new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());

??????? int pageSize = 1000; // 每次查詢交易條數(shù)
??????? int handleSize = 200; // 線程一次性處理的交易條數(shù)
??????? int handleCount = 0;
??????? int transCount = +++mapper.getTransCount(batchDate, +);//根據(jù)條件去查詢需要做業(yè)務(wù)的數(shù)據(jù)條數(shù),查詢條數(shù)的sql語(yǔ)句快
??????? logger.info(MessageFormat.format("[+++日期:[{0}], 待+++記錄條數(shù)為:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一個(gè)方法,推薦大家這么使用
??????? List<+++> tranList = null;
??????? while (handleCount < transCount)
??????? {
??????????? tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
??????????? if (tranList == null || tranList.size() == 0)
??????????? {
??????????????? logger.info(MessageFormat.format(++++
??????????????? handleCount += pageSize;
??????????????? continue;
??????????? }

??????????? int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
??????????? CountDownLatch latch = new CountDownLatch(splitCount);
??????????? for (int i = 0; i < splitCount; i++)
??????????? {
??????????????? int toIndex = (i + 1) * handleSize;
??????????????? if (i == splitCount - 1)
??????????????? {
??????????????????? toIndex = tranList.size();
??????????????? }
??????????????? List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
??????????????? threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到線程池,執(zhí)行的方法是+++Thread類中的run方法
??????????? }

??????????? handleCount += pageSize;

??????????? try
??????????? {
??????????????? latch.await(5, TimeUnit.MINUTES);
??????????? }
??????????? catch (InterruptedException e)
??????????? {
??????????????? logger.error(getClass().getName() + " doTask fail.", e);
??????????? }
??????? }

下面是threadPool.execute(new +++Thread...中的thread類

public class +++Thread implements Runnable
{

??? private Logger logger =

??? private List<NpcTransaction> trans;

??? private CountDownLatch latch;

??? private String batchDate;

??? private +++Manager

??? public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
??????????? CountDownLatch latch)
??? {
??????? this.+++Manager = +++Manager;
??????? this.trans = trans;
??????? this.batchDate = batchDate;
??????? this.latch = latch;
??? }

??? /**
???? * 重載方法
???? */
??? @Override
??? public void run()
??? {

??????? int saveCount = 0;
??????? try
??????? {
??????????? saveCount = +++Manager.save+++Record(trans);//
??????? }
??????? catch (Exception e)
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常:[{1}]", batchDate, e.getMessage()));
??????????? e.printStackTrace();
??????? }
??????? if (saveCount != trans.size())
??????? {
??????????? logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++異常,++成功條數(shù):[{1}],預(yù)期條數(shù):[{2}]", batchDate,
??????????????????? saveCount, trans.size()));
??????? }
??????? latch.countDown();

??? }

??? public List<++Transaction> getTrans()
??? {
??????? return trans;
??? }

??? public void setTrans(List<++Transaction> trans)
??? {
??????? this.trans = trans;
??? }

??? /**
???? * 獲取 latch
???? *
???? * @return 返回 latch
???? */
??? public CountDownLatch getLatch()
??? {

??????? return latch;
??? }

??? /**
???? * 設(shè)置 latch
???? *
???? * @param 對(duì)latch進(jìn)行賦值
???? */
??? public void setLatch(CountDownLatch latch)
??? {

??????? this.latch = latch;
??? }

??? /**
???? * 獲取 batchDate
???? *
???? * @return 返回 batchDate
???? */
??? public String getBatchDate()
??? {

??????? return batchDate;
??? }

??? /**
???? * 設(shè)置 batchDate
???? *
???? * @param 對(duì)batchDate進(jìn)行賦值
???? */
??? public void setBatchDate(String batchDate)
??? {

??????? this.batchDate = batchDate;
??? }

}

都要寫get,set方法,latch.countDown();這個(gè)最好寫在finally中

關(guān)于CountDownLatch這個(gè),我下面簡(jiǎn)單的說(shuō)一下:

CountDownLatch,一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。
主要方法
?public CountDownLatch(int count);
?public void countDown();
?public void await() throws InterruptedException
?構(gòu)造方法參數(shù)指定了計(jì)數(shù)的次數(shù)
?countDown方法,當(dāng)前線程調(diào)用此方法,則計(jì)數(shù)減一
?awaint方法,調(diào)用此方法會(huì)一直阻塞當(dāng)前線程,直到計(jì)時(shí)器的值為0

此處用計(jì)數(shù)器,因?yàn)橐唤M1000條數(shù)據(jù)通過(guò)5個(gè)線程去執(zhí)行,這組執(zhí)行完再進(jìn)行第二組以及其后組的1000條數(shù)據(jù)操作。




總結(jié)

以上是生活随笔為你收集整理的如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。