ThreadPoolExecutor线程池 + Queue队列
1:BlockingQueue繼承關(guān)系
??java.util.concurrent 包里的?BlockingQueue是一個(gè)接口,?繼承Queue接口,Queue接口繼承?Collection
?
??BlockingQueue----->Queue-->Collection
?圖:
?
隊(duì)列的特點(diǎn)是:先進(jìn)先出(FIFO)
?
2:BlockingQueue的方法
BlockingQueue 具有 4 組不同的方法用于插入、移除以及對(duì)隊(duì)列中的元素進(jìn)行檢查。如果請(qǐng)求的操作不能得到立即執(zhí)行的話,每個(gè)方法的表現(xiàn)也不同。這些方法如下:
?
?
| ? | 拋出異常 | 特殊值 | 阻塞 | 超時(shí) |
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 ? |
?
?
四組不同的行為方式解釋:
1(異常)
如果試圖的操作無(wú)法立即執(zhí)行,拋一個(gè)異常。
2(特定值)?
如果試圖的操作無(wú)法立即執(zhí)行,返回一個(gè)特定的值(常常是 true / false)。
3(阻塞)?
如果試圖的操作無(wú)法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行。
4(超時(shí))?
如果試圖的操作無(wú)法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行,但等待時(shí)間不會(huì)超過(guò)給定值。返回一個(gè)特定值以告知該操作是否成功(典型的是 true / false)。
??
1.首先是springBoot的項(xiàng)目框架如下:
2.業(yè)務(wù)測(cè)試流程涉及的類(lèi),如下
BusinessThread 類(lèi)
package com.springboot.demo.Threads;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
?* Created by Administrator on 2018/5/9.
?*/
@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{
? ? private String acceptStr;
? ? public BusinessThread(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }
? ? public String getAcceptStr() {
? ? ? ? return acceptStr;
? ? }
? ? public void setAcceptStr(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }
? ? @Override
? ? public void run() {
? ? ? ? //業(yè)務(wù)操作
? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號(hào):"+acceptStr);
? ? ? ? //線程阻塞
? ? ? ? /*try {
? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號(hào):"+acceptStr);
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }*/
? ? }
}
TestThreadPoolManager 類(lèi)
package com.springboot.demo.Threads;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;
/**
?* Created by Administrator on 2018/5/10.
?*/
@Component
public class TestThreadPoolManager implements BeanFactoryAware {
? ? //用于從IOC里取對(duì)象
? ? private BeanFactory factory; //如果實(shí)現(xiàn)Runnable的類(lèi)是通過(guò)spring的application.xml文件進(jìn)行注入,可通過(guò) factory.getBean()獲取,這里只是提一下
? ? // 線程池維護(hù)線程的最少數(shù)量
? ? private final static int CORE_POOL_SIZE = 2;
? ? // 線程池維護(hù)線程的最大數(shù)量
? ? private final static int MAX_POOL_SIZE = 10;
? ? // 線程池維護(hù)線程所允許的空閑時(shí)間
? ? private final static int KEEP_ALIVE_TIME = 0;
? ? // 線程池所使用的緩沖隊(duì)列大小
? ? private final static int WORK_QUEUE_SIZE = 50;
? ? @Override
? ? public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
? ? ? ? factory = beanFactory;
? ? }
? ? /**
? ? ?* 用于儲(chǔ)存在隊(duì)列中的訂單,防止重復(fù)提交,在真實(shí)場(chǎng)景中,可用redis代替 驗(yàn)證重復(fù)
? ? ?*/
? ? Map<String, Object> cacheMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 訂單的緩沖隊(duì)列,當(dāng)線程池滿(mǎn)了,則將訂單存入到此緩沖隊(duì)列
? ? ?*/
? ? Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();
? ? /**
? ? ?* 當(dāng)線程池的容量滿(mǎn)了,執(zhí)行下面代碼,將訂單存入到緩沖隊(duì)列
? ? ?*/
? ? final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
? ? ? ? @Override
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
? ? ? ? ? ? //訂單加入到緩沖隊(duì)列
? ? ? ? ? ? msgQueue.offer(((BusinessThread) r).getAcceptStr());
? ? ? ? ? ? System.out.println("系統(tǒng)任務(wù)太忙了,把此訂單交給(調(diào)度線程池)逐一處理,訂單號(hào):" + ((BusinessThread) r).getAcceptStr());
? ? ? ? }
? ? };
? ? /**創(chuàng)建線程池*/
? ?final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
? ? /**將任務(wù)加入訂單線程池*/
? ? public void addOrders(String orderId){
? ? ? ? System.out.println("此訂單準(zhǔn)備添加到線程池,訂單號(hào):" + orderId);
? ? ? ? //驗(yàn)證當(dāng)前進(jìn)入的訂單是否已經(jīng)存在
? ? ? ? if (cacheMap.get(orderId) == null) {
? ? ? ? ? ? cacheMap.put(orderId, new Object());
? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? }
? ? }
? ? /**
? ? ?* 線程池的定時(shí)任務(wù)----> 稱(chēng)為(調(diào)度線程池)。此線程池支持 定時(shí)以及周期性執(zhí)行任務(wù)的需求。
? ? ?*/
? ? final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
? ? /**
? ? ?* 檢查(調(diào)度線程池),每秒執(zhí)行一次,查看訂單的緩沖隊(duì)列是否有 訂單記錄,則重新加入到線程池
? ? ?*/
? ? final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? //判斷緩沖隊(duì)列是否存在記錄
? ? ? ? ? ? if(!msgQueue.isEmpty()){
? ? ? ? ? ? ? ? //當(dāng)線程池的隊(duì)列容量少于WORK_QUEUE_SIZE,則開(kāi)始把緩沖隊(duì)列的訂單 加入到 線程池
? ? ? ? ? ? ? ? if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
? ? ? ? ? ? ? ? ? ? String orderId = (String) msgQueue.poll();
? ? ? ? ? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? ? ? ? ? ? ? System.out.println("(調(diào)度線程池)緩沖隊(duì)列出現(xiàn)訂單業(yè)務(wù),重新添加到線程池,訂單號(hào):"+orderId);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }, 0, 1, TimeUnit.SECONDS);
? ? /**獲取消息緩沖隊(duì)列*/
? ? public Queue<Object> getMsgQueue() {
? ? ? ? return msgQueue;
? ? }
? ? /**終止訂單線程池+調(diào)度線程池*/
? ? public void shutdown() {
? ? ? ? //true表示如果定時(shí)任務(wù)在執(zhí)行,立即中止,false則等待任務(wù)結(jié)束后再停止
? ? ? ? System.out.println("終止訂單線程池+調(diào)度線程池:"+scheduledFuture.cancel(false));
? ? ? ? scheduler.shutdown();
? ? ? ? threadPool.shutdown();
? ? }
}
TestController 類(lèi)
package com.springboot.demo;
import com.springboot.demo.Threads.TestThreadPoolManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Queue;
import java.util.UUID;
/**
?* Created by Administrator on 2018/5/9.
?*/
@RestController
public class TestController {
? ? @Autowired
? ? TestThreadPoolManager testThreadPoolManager;
? ? /**
? ? ?* 測(cè)試模擬下單請(qǐng)求 入口
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/start/{id}")
? ? public String start(@PathVariable Long id) {
? ? ? ? //模擬的隨機(jī)數(shù)
? ? ? ? String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();
? ? ? ? testThreadPoolManager.addOrders(orderNo);
? ? ? ? return "Test ThreadPoolExecutor start";
? ? }
? ? /**
? ? ?* 停止服務(wù)
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/end/{id}")
? ? public String end(@PathVariable Long id) {
? ? ? ? testThreadPoolManager.shutdown();
? ? ? ? Queue q = testThreadPoolManager.getMsgQueue();
? ? ? ? System.out.println("關(guān)閉了線程服務(wù),還有未處理的信息條數(shù):" + q.size());
? ? ? ? return "Test ThreadPoolExecutor start";
? ? }
}
??
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor线程池 + Queue队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java高并发之BlockingQueu
- 下一篇: mybatis动态更新xml文件后热部署