日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

ThreadPoolExecutor线程池 + Queue队列

發(fā)布時(shí)間:2023/11/30 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ThreadPoolExecutor线程池 + Queue队列 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1:BlockingQueue繼承關(guān)系

??java.util.concurrent 包里的?BlockingQueue是一個(gè)接口,?繼承Queue接口,Queue接口繼承?Collection

?

??BlockingQueue----->Queue-->Collection

?圖:

?

隊(duì)列的特點(diǎn)是:先進(jìn)先出(FIFO)

?

2:BlockingQueue的方法

BlockingQueue 具有 4 組不同的方法用于插入、移除以及對(duì)隊(duì)列中的元素進(jìn)行檢查。如果請(qǐng)求的操作不能得到立即執(zhí)行的話,每個(gè)方法的表現(xiàn)也不同。這些方法如下:

?

?

?拋出異常特殊值阻塞超時(shí)
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
檢查element()peek()不可用不可用
?

?

?

四組不同的行為方式解釋:

1(異常)

如果試圖的操作無(wú)法立即執(zhí)行,拋一個(gè)異常。

2(特定值)?

如果試圖的操作無(wú)法立即執(zhí)行,返回一個(gè)特定的值(常常是 true / false)。

3(阻塞)?

如果試圖的操作無(wú)法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行。

4(超時(shí))?

如果試圖的操作無(wú)法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行,但等待時(shí)間不會(huì)超過(guò)給定值。返回一個(gè)特定值以告知該操作是否成功(典型的是 true / false)。

??

1.首先是springBoot的項(xiàng)目框架如下:

2.業(yè)務(wù)測(cè)試流程涉及的類(lèi),如下

BusinessThread 類(lèi)


package com.springboot.demo.Threads;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;


/**
?* Created by Administrator on 2018/5/9.
?*/
@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{

? ? private String acceptStr;

? ? public BusinessThread(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }

? ? public String getAcceptStr() {
? ? ? ? return acceptStr;
? ? }

? ? public void setAcceptStr(String acceptStr) {
? ? ? ? this.acceptStr = acceptStr;
? ? }

? ? @Override
? ? public void run() {
? ? ? ? //業(yè)務(wù)操作
? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號(hào):"+acceptStr);

? ? ? ? //線程阻塞
? ? ? ? /*try {
? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? System.out.println("多線程已經(jīng)處理訂單插入系統(tǒng),訂單號(hào):"+acceptStr);
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }*/
? ? }
}

TestThreadPoolManager 類(lèi)


package com.springboot.demo.Threads;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;

/**
?* Created by Administrator on 2018/5/10.
?*/
@Component
public class TestThreadPoolManager implements BeanFactoryAware {

? ? //用于從IOC里取對(duì)象
? ? private BeanFactory factory; //如果實(shí)現(xiàn)Runnable的類(lèi)是通過(guò)spring的application.xml文件進(jìn)行注入,可通過(guò) factory.getBean()獲取,這里只是提一下

? ? // 線程池維護(hù)線程的最少數(shù)量
? ? private final static int CORE_POOL_SIZE = 2;
? ? // 線程池維護(hù)線程的最大數(shù)量
? ? private final static int MAX_POOL_SIZE = 10;
? ? // 線程池維護(hù)線程所允許的空閑時(shí)間
? ? private final static int KEEP_ALIVE_TIME = 0;
? ? // 線程池所使用的緩沖隊(duì)列大小
? ? private final static int WORK_QUEUE_SIZE = 50;

? ? @Override
? ? public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
? ? ? ? factory = beanFactory;
? ? }

? ? /**
? ? ?* 用于儲(chǔ)存在隊(duì)列中的訂單,防止重復(fù)提交,在真實(shí)場(chǎng)景中,可用redis代替 驗(yàn)證重復(fù)
? ? ?*/
? ? Map<String, Object> cacheMap = new ConcurrentHashMap<>();


? ? /**
? ? ?* 訂單的緩沖隊(duì)列,當(dāng)線程池滿(mǎn)了,則將訂單存入到此緩沖隊(duì)列
? ? ?*/
? ? Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();


? ? /**
? ? ?* 當(dāng)線程池的容量滿(mǎn)了,執(zhí)行下面代碼,將訂單存入到緩沖隊(duì)列
? ? ?*/
? ? final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
? ? ? ? @Override
? ? ? ? public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
? ? ? ? ? ? //訂單加入到緩沖隊(duì)列
? ? ? ? ? ? msgQueue.offer(((BusinessThread) r).getAcceptStr());
? ? ? ? ? ? System.out.println("系統(tǒng)任務(wù)太忙了,把此訂單交給(調(diào)度線程池)逐一處理,訂單號(hào):" + ((BusinessThread) r).getAcceptStr());
? ? ? ? }
? ? };


? ? /**創(chuàng)建線程池*/
? ?final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);


? ? /**將任務(wù)加入訂單線程池*/
? ? public void addOrders(String orderId){
? ? ? ? System.out.println("此訂單準(zhǔn)備添加到線程池,訂單號(hào):" + orderId);
? ? ? ? //驗(yàn)證當(dāng)前進(jìn)入的訂單是否已經(jīng)存在
? ? ? ? if (cacheMap.get(orderId) == null) {
? ? ? ? ? ? cacheMap.put(orderId, new Object());
? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? }
? ? }

? ? /**
? ? ?* 線程池的定時(shí)任務(wù)----> 稱(chēng)為(調(diào)度線程池)。此線程池支持 定時(shí)以及周期性執(zhí)行任務(wù)的需求。
? ? ?*/
? ? final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);


? ? /**
? ? ?* 檢查(調(diào)度線程池),每秒執(zhí)行一次,查看訂單的緩沖隊(duì)列是否有 訂單記錄,則重新加入到線程池
? ? ?*/
? ? final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? //判斷緩沖隊(duì)列是否存在記錄
? ? ? ? ? ? if(!msgQueue.isEmpty()){
? ? ? ? ? ? ? ? //當(dāng)線程池的隊(duì)列容量少于WORK_QUEUE_SIZE,則開(kāi)始把緩沖隊(duì)列的訂單 加入到 線程池
? ? ? ? ? ? ? ? if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
? ? ? ? ? ? ? ? ? ? String orderId = (String) msgQueue.poll();
? ? ? ? ? ? ? ? ? ? BusinessThread businessThread = new BusinessThread(orderId);
? ? ? ? ? ? ? ? ? ? threadPool.execute(businessThread);
? ? ? ? ? ? ? ? ? ? System.out.println("(調(diào)度線程池)緩沖隊(duì)列出現(xiàn)訂單業(yè)務(wù),重新添加到線程池,訂單號(hào):"+orderId);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }, 0, 1, TimeUnit.SECONDS);


? ? /**獲取消息緩沖隊(duì)列*/
? ? public Queue<Object> getMsgQueue() {
? ? ? ? return msgQueue;
? ? }

? ? /**終止訂單線程池+調(diào)度線程池*/
? ? public void shutdown() {
? ? ? ? //true表示如果定時(shí)任務(wù)在執(zhí)行,立即中止,false則等待任務(wù)結(jié)束后再停止
? ? ? ? System.out.println("終止訂單線程池+調(diào)度線程池:"+scheduledFuture.cancel(false));
? ? ? ? scheduler.shutdown();
? ? ? ? threadPool.shutdown();

? ? }
}
TestController 類(lèi)


package com.springboot.demo;

import com.springboot.demo.Threads.TestThreadPoolManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Queue;
import java.util.UUID;

/**
?* Created by Administrator on 2018/5/9.
?*/
@RestController
public class TestController {

? ? @Autowired
? ? TestThreadPoolManager testThreadPoolManager;

? ? /**
? ? ?* 測(cè)試模擬下單請(qǐng)求 入口
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/start/{id}")
? ? public String start(@PathVariable Long id) {
? ? ? ? //模擬的隨機(jī)數(shù)
? ? ? ? String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();

? ? ? ? testThreadPoolManager.addOrders(orderNo);

? ? ? ? return "Test ThreadPoolExecutor start";
? ? }

? ? /**
? ? ?* 停止服務(wù)
? ? ?* @param id
? ? ?* @return
? ? ?*/
? ? @GetMapping("/end/{id}")
? ? public String end(@PathVariable Long id) {

? ? ? ? testThreadPoolManager.shutdown();

? ? ? ? Queue q = testThreadPoolManager.getMsgQueue();
? ? ? ? System.out.println("關(guān)閉了線程服務(wù),還有未處理的信息條數(shù):" + q.size());
? ? ? ? return "Test ThreadPoolExecutor start";
? ? }
}

??

總結(jié)

以上是生活随笔為你收集整理的ThreadPoolExecutor线程池 + Queue队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。