日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Marble原理之线程中断

發布時間:2024/4/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Marble原理之线程中断 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本章節依賴于【Marble使用】,閱讀本章節前請保證已經充分了解Marble。
中斷特性從Marble-Agent 2.0.5開始支持。

線程中斷使用

  • 引入marble-agent jar包
  • <dependency><groupId>com.github.jeff-dong</groupId><artifactId>marble-agent</artifactId><version>最新版</version> </dependency>
  • JOB執行代碼適當位置添加中斷標志, 下面給出示例代碼
  • @Component("job1") public class Job1 extends MarbleJob {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);@Overridepublic void execute(String param) throws Exception {logger.info("JOB1開始執行 ...");int i = 0;while (true) {i++;//1、用中斷狀態碼進行判斷if (Thread.interrupted()) {logger.info("JOB1-[{}]-[{}]被打斷啦", param, Thread.currentThread().getName());return;}try {Thread.sleep(500);} catch (InterruptedException e) {//2、捕獲終端異常后return結束return;}logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);}} }
  • Marble OFFLINE進行線程中斷
  • 3.1 手動調度線程中斷

    3.2 選擇要中斷的服務器進行終端嘗試

    3.3 查看中斷日志(同步JOB)

    中斷實現及原理

    Java的線程中斷

    Java的線程中斷機制是一種協作機制,線程中斷并不能立即停掉線程執行,相反,可能線程永遠都不會響應。
    java的線程中斷模型只是通過修改線程的中斷標志(interrupt)進行中斷通知,不會有其它額外操作,因此線程是否最終中斷取決于線程的執行邏輯。因此,如果想讓線程按照自己的想法中斷,要代碼中事先進行中斷的“埋點”處理。

    有人可能會想到Thread的stop方法進行中斷,由于此方法可能造成不可預知的結果,已經被拋棄

    Marble進行線程中斷實現

    需求收集
  • 以JOB為維度進行線程中斷;
  • 盡量做到實時響應;
  • 存在集群中多臺機器,要支持指定某臺機器中的線程中斷;
  • 允許多次中斷嘗試;
  • 中斷請求不能依賴于JOB當前狀態。可能已經停止調度的JOB也要手動中斷執行中的線程;
  • 透明和擴展不同JOB的中斷(提供用戶中斷的"后處理"擴展);
  • 需求分析及實現

    【以JOB為維度進行線程中斷】

    Marble的JOB標志為 schedulerName-appId-jobName組成,目前Marble每個JOB調度時間和頻率都是個性化,目前調度完成就銷毀。但要做到任何時間進行執行中的線程中斷就要:

    1.1 存儲JOB的運行線程,隨時準備中斷;
    1.2 在緩存的JOB數量/時間和性能間做權衡,不能過多也不能過少;
    1.3 制定緩存已滿時的拋棄策略,避免緩存被占滿新的線程永遠無法中斷;
    1.4 要同步JOB和異步JOB透明處理(感覺不出差異);

    實現:
    Marble的線程池中定義支持并發的MAP進行JOB維度的線程緩存,此外指定每個JOB下緩存的線程數量。如下:

    public class ThreadPool {...private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());//multimap的單個key的最大容量private static final int THREADMULTIMAP_SIZE = 50;... }

    Marble-Agent在同步/異步JOB生成新的線程對象時進行放入MAP緩存,如果緩存(50個)已滿采用如下策略進行處理:

  • 嘗試清理當前map中的非活躍線程;
  • 嘗試清理當前map中已經完成的線程(同步線程有效);
  • 如果還未清理出空間,移除最久的線程;
  • public ThreadPool multimapPut(String key, Object value) {if (StringUtils.isNotBlank(key)) {Collection collection = threadMultimap.get(key);if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {//替換最久的Iterator<Object> it = collection.iterator();//首先進行 非活躍線程清理while (it.hasNext()) {Object tempObj = it.next();if(tempObj instanceof MarbleThread){MarbleThread mt = (MarbleThread)tempObj;//不活躍刪除if(!mt.isThreadAlive()){it.remove();}}else if(tempObj instanceof MarbleThreadFeature){MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;//完成的線程刪除if(mf.isDone()){it.remove();}}}//仍然>最大值,刪除最久未使用if(collection.size() >= THREADMULTIMAP_SIZE){while (it.hasNext()) {it.next();it.remove();break;}}threadMultimap.put(key, value);return this;}}threadMultimap.put(key, value);return this;}

    此外,為了能在JVM關閉時進行線程中斷,添加JVM hook進行中斷調用處理(包括線程池的銷毀)。
    除此之外,還有個小問題,由于線程池使用的是有界的阻塞隊列,此種情況下,線程中斷時可能有的線程存在于阻塞隊列中,單純的中斷無效,對于此類情況,要首先判斷阻塞隊列中是否存在要中斷的線程,存在的話進行隊列的移除操作。

    【盡量做到實時響應】
    只能通過用戶在具體的線程邏輯中進行埋點處理,Marble在框架層面除了及時把用戶的中斷請求送達之外,沒有其它措施。

    【存在集群中多臺機器,要支持指定某臺機器中的線程中斷】
    Marble OFFLINE的中斷頁面支持機器的選擇,用戶進行選擇后,Marble會有針對性的進行機器的中斷RPC發送。

    【允許多次中斷嘗試】
    OFFLINE未對中斷次數進行限制,目前支持多次中斷請求發送。

    【中斷請求不能依賴于JOB當前狀態】
    考慮到用戶對歷史線程的中斷請求,Marble未把中斷操作綁定在JOB狀態上,任何JOB都可以進行終端嘗試。

    【透明擴展不同JOB的中斷】
    Marble目前支持同步和異步JOB,兩類JOB的中斷處理并不一致,比如同步job的中斷是通過FeatureTask的cancel實現,異步JOB是通過Thread的interrupt實現,此外線程被中斷后Marble希望能更進一步提供一個統一的“后處理”操作給用戶自己實現,比如用戶可能需要在線程被中斷后進行一些后續的log記錄等。

    為了代碼層面一致透明,且友好的實現“后處理”的封裝,Marble使用了代理模式,在Thread和FeatureTask上添加了一層“代理類”,由代理進行具體的中斷操作。
    同步JOB代理類:

    /*** @author <a href="dongjianxing@aliyun.com">jeff</a>* @version 2017/4/19 16:31*/ public class MarbleThreadFeature<V> implements RunnableFuture<V> {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);private MarbleJob marbleJob;private String param;private FutureTask<Result> futureTask;public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {super();this.marbleJob = marbleJob;this.param = param;futureTask = new FutureTask<>(new Callable<Result>() {@Overridepublic Result call() throws Exception {return marbleJob.executeSync(param);}});}@Overridepublic void run() {futureTask.run();}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return futureTask.cancel(mayInterruptIfRunning);}@Overridepublic boolean isCancelled() {return futureTask.isCancelled();}@Overridepublic boolean isDone() {return futureTask.isDone();}@Overridepublic V get() throws InterruptedException, ExecutionException {return (V) futureTask.get();}@Overridepublic V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {return (V) futureTask.get(timeout, unit);}public void stop(String operator) {if (futureTask != null && !futureTask.isCancelled()) {logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());futureTask.cancel(true);}else if(marbleJob != null){boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);}//中斷后處理if(marbleJob != null){marbleJob.afterInterruptTreatment();}}}

    異步JOB代理類:

    /*** @author <a href="dongjianxing@aliyun.com">jeff</a>* @version 2017/4/19 16:31*/ public class MarbleThread implements Runnable {private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);private MarbleJob marbleJob;private String param;private Thread runThread;public MarbleThread(MarbleJob marbleJob, String param) {super();this.marbleJob = marbleJob;this.param = param;}@Overridepublic void run() {runThread = Thread.currentThread();try {marbleJob.execute(param);} catch (Exception e) {e.printStackTrace();}}public boolean isThreadAlive() {return (runThread != null && runThread.isAlive());}public String getThreadName() {return runThread != null ? runThread.getName() : "";}public void stop() {//首先嘗試在阻塞隊列中刪除boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);if (runThread != null && !runThread.isInterrupted()) {logger.info("Thread[{}] is interrupted", runThread.getName());runThread.interrupt();}//中斷后處理if (marbleJob != null) {marbleJob.afterInterruptTreatment();}} }

    總結

    以上是生活随笔為你收集整理的Marble原理之线程中断的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。