日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

threadlocal使用_多方位点评ThreadLocal,细看各大开源软件实现

發布時間:2025/3/15 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 threadlocal使用_多方位点评ThreadLocal,细看各大开源软件实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ThreadLocal類可以幫助創建只能由同一個線程訪問和寫入的變量。因此,即使是兩個線程在執行相同的代碼,代碼引用相同的ThreadLocal變量,兩個線程也只能看到自己線程的ThreadLocal變量。ThreadLocal類提供了一個簡單地方法維護線程安全。

ThreadLocal源碼

ThreadLocal類可以創建同一個線程訪問和寫入的變量,變量存儲在ThreadLocal.ThreadLocalMap類中,ThreadLocalMap使用數組存儲ThreadLocal變量和變量值。因為變量是和線程綁定的,所以ThreadLocalMap的引用在Thread類中,每個線程只能訪問它私有的ThreadLocalMap變量,從而保證ThreadLocal變量和線程的綁定關系。

public?class?Thread?implements?Runnable?{
????/*?ThreadLocal?values?pertaining?to?this?thread.?This?map?is?maintained
?????*?by?the?ThreadLocal?class.?*/
????ThreadLocal.ThreadLocalMap?threadLocals?=?null;

????/*
?????*?InheritableThreadLocal?values?pertaining?to?this?thread.?This?map?is
?????*?maintained?by?the?InheritableThreadLocal?class.
?????*/
????ThreadLocal.ThreadLocalMap?inheritableThreadLocals?=?null;

????public?static?native?Thread?currentThread();
}

ThreadLocalMap的維護是由ThreadLocal類實現的,ThreadLocal類負責向ThreadLocalMap中寫入讀取和移除變量。

public?class?ThreadLocal<T>?{

????public?T?get()?{
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)?{
????????????ThreadLocalMap.Entry?e?=?map.getEntry(this);
????????????if?(e?!=?null)?{
????????????????@SuppressWarnings("unchecked")
????????????????T?result?=?(T)e.value;
????????????????return?result;
????????????}
????????}
????????return?setInitialValue();
????}

????private?T?setInitialValue()?{
????????T?value?=?initialValue();
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)
????????????map.set(this,?value);
????????else
????????????createMap(t,?value);
????????return?value;
????}

????protected?T?initialValue()?{
????????return?null;
????}

????public?void?set(T?value)?{
????????Thread?t?=?Thread.currentThread();
????????ThreadLocalMap?map?=?getMap(t);
????????if?(map?!=?null)
????????????map.set(this,?value);
????????else
????????????createMap(t,?value);
????}

????public?void?remove()?{
????????ThreadLocalMap?m?=?getMap(Thread.currentThread());
????????if?(m?!=?null)
????????????m.remove(this);
????}

????ThreadLocalMap?getMap(Thread?t)?{
????????return?t.threadLocals;
????}

????void?createMap(Thread?t,?T?firstValue)?{
????????t.threadLocals?=?new?ThreadLocalMap(this,?firstValue);
????}

????static?ThreadLocalMap?createInheritedMap(ThreadLocalMap?parentMap)?{
????????return?new?ThreadLocalMap(parentMap);
????}
}

可以看到ThreadLocal在讀取寫入或移除變量時需要先獲取ThreadLocalMap,步驟是:

Thread?t?=?Thread.currentThread();
ThreadLocalMap?map?=?t.threadLocals;

繼而通過ThreadLocalMap實現讀取寫入或移除變量的操作。

ThreadLocalMap是一個hash map的實現,基于數組,解決hash碰撞的方式是開放地址法而不是HashMap的拉鏈法,這里沒有使用JDK集合框架的相關的Map實現。數組中存儲實體是一個Entry的內部類,它繼承了WeakReference接口,使用了弱引用的特性,確保不會因為Entry的原因導致ThreadLocal對象無法被回收。但是注意table[]數組對于Entry的引用是強引用,Entry對象對于value對象的引用也是強引用。

ThreadLocal的潛在內存泄漏問題

ThreadLocal如果使用不當會出現內存泄漏的問題。因為ThreadLocal保證了變量是和線程綁定的,所以官方建議在使用ThreadLocal的時候將ThreadLocal實例設置為static:

?{@code?ThreadLocal}?instances?are?typically?private?static?fields?in?classes?that?wish?to?associate?state?with?a?thread?(e.g.,?a?user?ID?or?Transaction?ID).
?

當ThreadLocal實例設置為靜態變量后,實例就不會隨著對象的gc而一起被gc,導致ThreadLocal內的對象一致被引用從而無法釋放。

內存泄漏的含義表示Value對象無法回收,而不是ThreadLocal對象無法回收。

這里的內存泄漏有三個條件:

  • 線程常駐,不會隨著任務結束而銷毀,從而導致ThreadLocalMap對象一直存在。
  • ThreadLocal實例被聲明為靜態,從而導致棧上一直有顯式的ThreadLocal引用存在。
  • ThreadLocal實例在使用完畢后未調用#remove方法即結束任務。
  • 下圖是棧和堆中實例和引用關系。從圖中可知如果線程常駐會導致ThreadLocalMap對象無法被回收,從而內部的table: Entry[]變量無法被回收。

    圖中有兩個引用是指向堆中的ThreadLocal實例,一個是棧上的ThreadLocal引用,即代碼中創建的ThreadLocal對象,另一個是ThreadLocalMap持有的Entry對ThreadLocal對象的弱引用。

    在ThreadLocalMap的內部類Entry中,使用弱應用持有ThreadLocal實例的引用,當垃圾回收壓力大時會回收ThreadLocal實例,但是如果代碼持有的ThreadLocal引用被聲明為靜態的,那么就不會隨著任務的執行結束而自動釋放,即會一直有個棧上的ThreadLocal引用存在,從而導致ThreadLocal無法被回收,而Entry對象也一直持有ThreadLocal的引用,導致無法觸發stale機制,從而Value對象無法回收。

    static?class?Entry?extends?WeakReference<ThreadLocal>>?{
    ????/**?The?value?associated?with?this?ThreadLocal.?*/
    ????Object?value;
    ????
    ????Entry(ThreadLocal>?k,?Object?v)?{
    ????????super(k);
    ????????value?=?v;
    ????}
    }

    stale機制是Entry使用ThreadLocal對象作為key,如果ThreadLocal對象被回收,那么Entry對象的key即為null,Entry對象不能再被外界訪問,從而可以被清除釋放。因為Entry對象持有的是弱引用,所以當棧上釋放引用的時候ThreadLocal就有了被回收的可能性,當某段時間后ThreadLocal對象被垃圾回收,Entry對象對ThreadLocal引用就變成了null,從而ThreadLocalMap就知道這個Entry對象永遠無法在被訪問,即將Entry對象清除。

    解決辦法就是代碼在執行完畢后手動調用#remove方法。當調用完畢后棧上依然持有ThreadLocal的引用,但是Entry對象會移除對ThreadLocal的引用,并主動設置value為null,然后將table數組內對Entry的引用設置為null,這樣就保證value對象和Entry對象都會被回收。

    這里還有個問題:即stale對象會在ThreadLocalMap的存活時間。如果棧上釋放ThreadLocal對象后,ThreadLocal對象的垃圾回收需要遵從弱引用的規則,其次當ThreadLocal對象已經被回收后ThreadLocalMap對象清除Entry對象也不是立馬能夠完成的。只有在ThreadLocalMap在進行數組擴容時才會檢測所有的對象并清除其中的stale對象,其余時候只在添加新的Entry實例時掃描幾輪table數組清除部分stale對象。

    ExecutorService中使用ThreadLocal

    如果打算同時使用線程池,且提交給線程池的任務代碼中使用了ThreadLocal,如果使用ThreadLocal不當便會出現難以排查的并發問題。

    線程池線程運行時傳值

    ThreadLocal類保證了變量只有同一個線程才能訪問和寫入,那么如果執行任務的線程創建了一個線程或者將任務提交到線程池,那么任務的執行就會涉及到多個線程,ThreadLocal變量需要在涉及的多個線程間共享即支持訪問和寫入。

    對于新創建線程的問題,JDK提供了InheritableThreadLocal類。InheritableThreadLocal支持在任務線程創建新的線程時將任務線程的ThreadLocal變量通過淺拷貝共享給子線程,這樣子線程在執行任務的時候可以訪問寫入任務線程的ThreadLocal變量。

    但是對于線程池中的線程,因為工作線程可能會常駐,任務提交到線程池時被放入工作隊列,工作線程從隊列中彈出任務進行執行,導致InheritableThreadLocal無法生效。

    解決方案是在創建任務時獲取ThreadLocal的變量,保存在任務對象中,當任務執行時將變量設置到執行線程的ThreadLocal變量中。

    public?class?CustomAttachmentDemo?{

    ????private?static?ThreadLocal?holder?=?new?ThreadLocal<>();public?static?void?main(String[]?args)?throws?Exception?{
    ????????holder.set("value-set-in-parent");
    ????????Runnable?task?=?()?->?System.out.println("[child?thread]?get?"?+?holder.get());
    ????????Thread?thread?=?new?Thread(new?AttachmentdRunnable(task));
    ????????thread.start();
    ????????thread.join();
    ????????System.out.println("[parent?thread]?get?"?+?holder.get());
    ????}private?static?class?AttachmentdRunnable?implements?Runnable?{private?String?attachments;private?Runnable?task;public?AttachmentdRunnable(Runnable?task)?{this.task?=?task;this.attachments?=?holder.get();
    ????????}@Overridepublic?void?run()?{
    ????????????holder.set(attachments);
    ????????????task.run();
    ????????}
    ????}
    }

    工作線程的多任務間的ThreadLocal變量

    因為線程池復用線程,線程池內的線程可能會常駐不會銷毀,任務中從ThreadLocal中訪問的變量可能是之前的任務寫入的,并非當前任務寫入的變量,從而導致不同的任務的ThreadLocal變量可以被多個任務讀取和寫入導致錯誤。

    解決方案是在每個任務執行完畢后可以使用try/finally調用#remove()方法清除線程持有的線程本地變量。

    @Override
    public?void?run()?{
    ????try?{
    ????????holder.set(attachments);
    ????????task.run();
    ????}?finally?{
    ????????holder.remove();
    ????}
    }

    恢復工作線程ThreadLocal變量

    工作線程可能在執行任務前有自己的ThreadLocal變量,在任務執行完畢后需要恢復工作線程的ThreadLocal變量以執行下一個任務,從而保證每個任務在執行的時候上下文環境一致。

    transmittalbe-thread-local解決方案

    阿里開源的ttl提供了任務提交給線程池時的ThreadLocal值傳遞到任務執行時。ttl不僅實現工作線程執行時傳值到ThreadLocal,還包括任務執行完畢后恢復工作線程池狀態的功能。使用ttl有三種方式:

  • 修飾Runnable或Callable
  • 修飾線程池
  • 使用Agent修飾線程池。
  • 其中修飾線程池的方式就是在調用execute,invoke等方法時調用TtlRunnable#get(Runnable)或TtlCallable#get(Callable)方法修飾Runnable和Callable。而Agent修飾線程池就是將對線程池的顯式修飾改為Agent增強。所以ttl的核心邏輯是對于Runnable和Callable的增強。

    ttl有多種使用場景,在github上有一個使用案例文章匯總。

    修飾Runnable或Callable

    ttl提供的Runnable的修飾類TtlRunnable如下,這里精簡了很多的無關代碼,只顯示了最核心的代碼。

    public?final?class?TtlRunnable?implements?Runnable?{
    ????private?final?AtomicReference?capturedRef;private?final?Runnable?runnable;private?final?boolean?releaseTtlValueReferenceAfterRun;private?TtlRunnable(@NonNull?Runnable?runnable,?boolean?releaseTtlValueReferenceAfterRun)?{this.capturedRef?=?new?AtomicReference(capture());this.runnable?=?runnable;this.releaseTtlValueReferenceAfterRun?=?releaseTtlValueReferenceAfterRun;
    ????}/**
    ?????*?wrap?method?{@link?Runnable#run()}.
    ?????*/@Overridepublic?void?run()?{final?Object?captured?=?capturedRef.get();if?(captured?==?null?||?releaseTtlValueReferenceAfterRun?&&?!capturedRef.compareAndSet(captured,?null))?{throw?new?IllegalStateException("TTL?value?reference?is?released?after?run!");
    ????????}final?Object?backup?=?replay(captured);try?{
    ????????????runnable.run();
    ????????}?finally?{
    ????????????restore(backup);
    ????????}
    ????}
    }

    可以看到處理邏輯分為三步:

  • 從父線程中捕獲ThreadLocal變量
  • 將捕獲的變量回放到工作線程的ThreadLocal變量中
  • 執行結束后恢復工作線程的ThreadLocal環境
  • 而完成這三步的方法capture(), replay(Object), restore(Object)方法都是TransmittableThreadLocal.Transmitter提供的方法。

    在TransmittableThreadLocal類中使用靜態變量private static final InheritableThreadLocal, ?>> holder持有invoke線程的ThreadLocal變量,類似一個ThreadLocal注冊表。只所以使用WeakHashMap的考慮和ThreadLocalMap中Entry使用WeakReference的道理一致,是為了防止業務代碼沒有顯式調用ThreadLocal#remove()方法時能夠回收Entry對象。

    public?class?TransmittableThreadLocal<T>?extends?InheritableThreadLocal<T>?implements?TtlCopier<T>?{
    ????private?static?final?Logger?logger?=?Logger.getLogger(TransmittableThreadLocal.class.getName());

    ????private?final?boolean?disableIgnoreNullValueSemantics;

    ????public?TransmittableThreadLocal()?{
    ????????this(false);
    ????}

    ????public?TransmittableThreadLocal(boolean?disableIgnoreNullValueSemantics)?{
    ????????this.disableIgnoreNullValueSemantics?=?disableIgnoreNullValueSemantics;
    ????}

    ????public?T?copy(T?parentValue)?{
    ????????return?parentValue;
    ????}

    ????public?final?T?get()?{
    ????????T?value?=?super.get();
    ????????if?(disableIgnoreNullValueSemantics?||?null?!=?value)?addThisToHolder();
    ????????return?value;
    ????}

    ????public?final?void?set(T?value)?{
    ????????if?(!disableIgnoreNullValueSemantics?&&?null?==?value)?{
    ????????????//?may?set?null?to?remove?value
    ????????????remove();
    ????????}?else?{
    ????????????super.set(value);
    ????????????addThisToHolder();
    ????????}
    ????}

    ????public?final?void?remove()?{
    ????????removeThisFromHolder();
    ????????super.remove();
    ????}

    ????private?void?superRemove()?{
    ????????super.remove();
    ????}

    ????//?Note?about?the?holder:
    ????//?1.?holder?self?is?a?InheritableThreadLocal(a?*ThreadLocal*).
    ????//?2.?The?type?of?value?in?the?holder?is?WeakHashMap,??>.
    ????//????2.1?but?the?WeakHashMap?is?used?as?a?*Set*:
    ????//????????the?value?of?WeakHashMap?is?*always*?null,?and?never?used.
    ????//????2.2?WeakHashMap?support?*null*?value.
    ????private?static?final?InheritableThreadLocal,??>>?holder?=new?InheritableThreadLocal,??>>()?{@Overrideprotected?WeakHashMap,??>?initialValue()?{return?new?WeakHashMap,?Object>();
    ????????????????}@Overrideprotected?WeakHashMap,??>?childValue(WeakHashMap,??>?parentValue)?{return?new?WeakHashMap,?Object>(parentValue);
    ????????????????}
    ????????????};private?void?addThisToHolder()?{if?(!holder.get().containsKey(this))?{
    ????????????holder.get().put((TransmittableThreadLocal)?this,?null);?//?WeakHashMap?supports?null?value.
    ????????}
    ????}private?void?removeThisFromHolder()?{
    ????????holder.get().remove(this);
    ????}
    }

    可以看到TransmittableThreadLocal類的get(), set(), remove()方法都增加了對ThreadLocal注冊表的操作,確保創建移除TransmittableThreadLocal的時候能夠自動將TransmittableThreadLocal實例注冊到注冊表中。

    TransmittableThreadLocal.Transmitter的源碼如下,里面省略了很多的代碼,其中Transmitter不僅支持TransmittableThreadLocal的捕獲重放和回放,還提供了對ThreadLocal和InheritableThreadLocal的增強功能,通過對ThreadLocal和InheritableThreadLocal的注冊,實現和TransmittableThreadLocal類似的功能。其中ThreadLocal和InheritableThreadLocal會被存儲到private static volatile WeakHashMap, TtlCopier> threadLocalHolder變量中,Transmitter對它的處理邏輯和TransmittableThreadLocal#holder的處理邏輯一致,其中Transmitter一般TtlValues針對的是holder注冊表的數據,ThreadLocalValues針對的是Transmitter注冊的數據。

    public?static?class?Transmitter?{
    ????public?static?Object?capture()?{
    ????????return?new?Snapshot(captureTtlValues(),?captureThreadLocalValues());
    ????}

    ????private?static?HashMap,?Object>?captureTtlValues()?{
    ????????HashMap,?Object>?ttl2Value?=?new?HashMap,?Object>();for?(TransmittableThreadLocal?threadLocal?:?holder.get().keySet())?{
    ????????????ttl2Value.put(threadLocal,?threadLocal.copyValue());
    ????????}return?ttl2Value;
    ????}public?static?Object?replay(Object?captured)?{final?Snapshot?capturedSnapshot?=?(Snapshot)?captured;return?new?Snapshot(replayTtlValues(capturedSnapshot.ttl2Value),?replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
    ????}private?static?HashMap,?Object>?replayTtlValues(HashMap,?Object>?captured)?{
    ????????HashMap,?Object>?backup?=?new?HashMap,?Object>();for?(final?Iterator>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{
    ????????????TransmittableThreadLocal?threadLocal?=?iterator.next();//?backup
    ????????????backup.put(threadLocal,?threadLocal.get());//?clear?the?TTL?values?that?is?not?in?captured//?avoid?the?extra?TTL?values?after?replay?when?run?taskif?(!captured.containsKey(threadLocal))?{
    ????????????????iterator.remove();
    ????????????????threadLocal.superRemove();
    ????????????}
    ????????}//?set?TTL?values?to?captured
    ????????setTtlValuesTo(captured);//?call?beforeExecute?callback
    ????????doExecuteCallback(true);return?backup;
    ????}private?static?void?setTtlValuesTo(@NonNull?HashMap,?Object>?ttlValues)?{for?(Map.Entry,?Object>?entry?:?ttlValues.entrySet())?{
    ????????????TransmittableThreadLocal?threadLocal?=?entry.getKey();
    ????????????threadLocal.set(entry.getValue());
    ????????}
    ????}public?static?void?restore(@NonNull?Object?backup)?{final?Snapshot?backupSnapshot?=?(Snapshot)?backup;
    ????????restoreTtlValues(backupSnapshot.ttl2Value);
    ????????restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
    ????}private?static?void?restoreTtlValues(@NonNull?HashMap,?Object>?backup)?{//?call?afterExecute?callback
    ????????doExecuteCallback(false);for?(final?Iterator>?iterator?=?holder.get().keySet().iterator();?iterator.hasNext();?)?{
    ????????????TransmittableThreadLocal?threadLocal?=?iterator.next();//?clear?the?TTL?values?that?is?not?in?backup//?avoid?the?extra?TTL?values?after?restoreif?(!backup.containsKey(threadLocal))?{
    ????????????????iterator.remove();
    ????????????????threadLocal.superRemove();
    ????????????}
    ????????}//?restore?TTL?values
    ????????setTtlValuesTo(backup);
    ????}private?static?class?Snapshot?{final?HashMap,?Object>?ttl2Value;final?HashMap,?Object>?threadLocal2Value;private?Snapshot(HashMap,?Object>?ttl2Value,?HashMap,?Object>?threadLocal2Value)?{this.ttl2Value?=?ttl2Value;this.threadLocal2Value?=?threadLocal2Value;
    ????????}
    ????}private?static?volatile?WeakHashMap,?TtlCopier>?threadLocalHolder?=?new?WeakHashMap,?TtlCopier>();private?Transmitter()?{throw?new?InstantiationError("Must?not?instantiate?this?class");
    ????}
    }

    仔細觀察TtlRunnable的實現,可以看出#capture()方法是在構造器調用的,處理的線程是invoke線程,結合holder變量的類型是InheritableThreadLocal,這里捕獲的都是invoke線程的ThreadLocal變量,轉換成一個Map。

    #replay(Object), #restore(Object)方法是在#run()方法內,此時已經是工作線程在處理了。可以看出replay方法和restore方法是互為鏡像的方法,replay首先記錄子線程的數據進行備份稍后作為方法的返回結果,然后將入參傳入的需要回放的數據設置到工作線程的ThreadLocal中。restore方法省略了備份,將入參傳入的需要恢復的數據設置到工作線程的ThreadLocal中。

    Netty關于ThreadLocal的優化FastThreadLocal

    因為ThreadLocalMap其實是基于尋址法實現的hash map,在最差的情況下O(1)的時間復雜度會退化為O(N)的線形查找。Netty針對這點進行了優化,提供了FastThreadLocal實現,主要改動是對于InternalThreadLocalMap的實現。它的存儲也是基于數組實現,但是數組的索引是無法復用的,每個新創建的FastThreadLocal都會新增加一個唯一的索引,對于InternalThreadLocalMap中數組的一個下標。這個索引是只增不減的,從而保證了每次查詢都是O(1)的時間復雜度,但是當FastThreadLocal被回收后,InternalThreadLocalMap中底層數組索引對應的下標也不會在使用,這是一種典型的空間換時間的做法。這個優化點也被dubbo移植到項目中。

    dubbo關于RpcContext的實現

    ?

    上下文信息是一次 RPC 調用過程中附帶的環境信息,如方法名、參數類型、真實參數、本端/對端地址等。這些數據僅屬于一次調用,作用于 Consumer 到 Provider 調用的整個流程。

    ?

    RpcContext的底層實現借助了ThreadLocal。其中ThreadLocal的實現移植了Netty項目中FastThreadLocal的實現。

    下面是RpcContext的部分變量,它內部持有了很多RPC調用過程中的環境信息,在網絡傳輸時RpcContext中的信息作為Message的一部分序列化傳輸到另一端。在另一端經過反序列化后經過路由分配到相應地Invoker實現時被Filter從Message中讀取重新設置到RpcContext中,但是這個時候任務已經開始執行了,開始執行真正地業務邏輯而不是框架代碼。

    /**
    ?*?use?internal?thread?local?to?improve?performance
    ?*/
    private?static?final?InternalThreadLocal?LOCAL?=?new?InternalThreadLocal()?{@Overrideprotected?RpcContext?initialValue()?{return?new?RpcContext();
    ????}
    };/**
    ?*?用于服務端傳遞
    ?*/private?static?final?InternalThreadLocal?SERVER_LOCAL?=?new?InternalThreadLocal()?{@Overrideprotected?RpcContext?initialValue()?{return?new?RpcContext();
    ????}
    };private?final?Map?attachments?=?new?HashMap();private?URL?url;private?String?methodName;private?Class>[]?parameterTypes;private?Object[]?arguments;private?InetSocketAddress?localAddress;private?InetSocketAddress?remoteAddress;

    dubbo規避了任務提交到線程池時的ThreadLocal的傳值問題,如果在業務代碼中需要提交線程池等操作就需要開發者自己維護ThreadLocal的傳值問題。

    dubbo的Filter使用了SPI機制,通過@Activate注解可知消費者端接收Response Message,消費者端接收Request Message,對應的RpcContext支持如下:

    @Activate(group?=?Constants.CONSUMER,?order?=?-10000)
    public?class?ConsumerContextFilter?implements?Filter?{

    ????@Override
    ????public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
    ????????RpcContext.getContext()
    ????????????????.setInvoker(invoker)
    ????????????????.setInvocation(invocation)
    ????????????????.setLocalAddress(NetUtils.getLocalHost(),?0)
    ????????????????.setRemoteAddress(invoker.getUrl().getHost(),
    ????????????????????????invoker.getUrl().getPort());
    ????????if?(invocation?instanceof?RpcInvocation)?{
    ????????????((RpcInvocation)?invocation).setInvoker(invoker);
    ????????}
    ????????try?{
    ????????????RpcResult?result?=?(RpcResult)?invoker.invoke(invocation);
    ????????????RpcContext.getServerContext().setAttachments(result.getAttachments());
    ????????????return?result;
    ????????}?finally?{
    ????????????RpcContext.getContext().clearAttachments();
    ????????}
    ????}

    }
    @Activate(group?=?Constants.PROVIDER,?order?=?-10000)
    public?class?ContextFilter?implements?Filter?{

    ????@Override
    ????public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
    ????????Map?attachments?=?invocation.getAttachments();if?(attachments?!=?null)?{
    ????????????attachments?=?new?HashMap(attachments);
    ????????????attachments.remove(Constants.PATH_KEY);
    ????????????attachments.remove(Constants.GROUP_KEY);
    ????????????attachments.remove(Constants.VERSION_KEY);
    ????????????attachments.remove(Constants.DUBBO_VERSION_KEY);
    ????????????attachments.remove(Constants.TOKEN_KEY);
    ????????????attachments.remove(Constants.TIMEOUT_KEY);
    ????????????attachments.remove(Constants.ASYNC_KEY);//?Remove?async?property?to?avoid?being?passed?to?the?following?invoke?chain.
    ????????}
    ????????RpcContext.getContext()
    ????????????????.setInvoker(invoker)
    ????????????????.setInvocation(invocation)//????????????????.setAttachments(attachments)??//?merged?from?dubbox
    ????????????????.setLocalAddress(invoker.getUrl().getHost(),
    ????????????????????????invoker.getUrl().getPort());//?mreged?from?dubbox//?we?may?already?added?some?attachments?into?RpcContext?before?this?filter?(e.g.?in?rest?protocol)if?(attachments?!=?null)?{if?(RpcContext.getContext().getAttachments()?!=?null)?{
    ????????????????RpcContext.getContext().getAttachments().putAll(attachments);
    ????????????}?else?{
    ????????????????RpcContext.getContext().setAttachments(attachments);
    ????????????}
    ????????}if?(invocation?instanceof?RpcInvocation)?{
    ????????????((RpcInvocation)?invocation).setInvoker(invoker);
    ????????}try?{
    ????????????RpcResult?result?=?(RpcResult)?invoker.invoke(invocation);//?pass?attachments?to?result
    ????????????result.addAttachments(RpcContext.getServerContext().getAttachments());return?result;
    ????????}?finally?{
    ????????????RpcContext.removeContext();
    ????????????RpcContext.getServerContext().clearAttachments();
    ????????}
    ????}
    }

    lucene和elasticsearch解決方案

    因為ThreadLocalMap的value的移除如果不是顯式地調用ThreadLocal#remove()方法,那么就需要弱引用的垃圾回收機制和stale對象的移除機制,所以可能ThreadLocal中value的移除可能會花費非常多地時間,然而這不是一個內存泄漏,因為它最后能夠被回收。但是當弱引用被GC機制回收之后,如果沒有ThreadLocalMap#set()方法調用,那么永遠不會觸發stale的Entry對象回收,那么Entry對象和value對象也都不能被回收,極端情況下可能造成OOM問題。那么只需要顯示地調用一次ThreadLocal#remove()方法即可解決這個問題。

    Lucene的CloseableThreadLocal

    lucene提供了CloseableThreadLocal方法繼承了Closeable方法,在#close()方法內執行了#remove()方法。

  • 提供了Closeable方法,可以在代碼中使用try/with語法進行自動關閉。
  • 所有的#set()的value值都是用WeakReference包了一層,從而可以觸發value的弱引用垃圾回收。
  • ThreadLocal是每次set的時候執行檢測,CloseableThreadLocal是訪問一定次數后就檢測所有的數據。
  • CloseableThreadLocal的檢測機制是通過一個CloseableThreadLocal#hardRefs的參數記錄線程和value的對應,當線程掛了的時候就可以移除hardRefs中的數據。
  • hardRefs是一個WeakHashMap,可以在一定條件下觸發垃圾回收釋放內存。
  • elasticsearch的ThreadContext

    ThreadContext提供了和ttl類似的功能。elasticsearch因為任務主要是transport任務,在請求和響應的處理中需要透明處理headers的傳值,當收到網絡請求時,所有的headers通過反序列化后通過ThreadContext#readHeaders(StreamInput)方法直接添加進ThreadContext,當請求處理完畢后同樣會恢復之前的context以處理下一個請求。

    ThreadContext同樣借助ThreadLocal實現功能,它使用的是Lucene優化后的CloseableThreadLocal類。elasticsearch的ContextThreadLocal指定了ThreadLocal存儲的變量類型為ThreadContextStruct,這是個headers的bean包裝類。

    private?static?class?ContextThreadLocal?extends?CloseableThreadLocal<ThreadContextStruct>?{
    ????
    ????private?final?AtomicBoolean?closed?=?new?AtomicBoolean(false);

    ????@Override
    ????public?void?set(ThreadContextStruct?object)?{
    ????????try?{
    ????????????if?(object?==?DEFAULT_CONTEXT)?{
    ????????????????super.set(null);
    ????????????}?else?{
    ????????????????super.set(object);
    ????????????}
    ????????}?catch?(NullPointerException?ex)?{
    ????????????/*?This?is?odd?but?CloseableThreadLocal?throws?a?NPE?if?it?was?closed?but?still?accessed.
    ???????????????to?get?a?real?exception?we?call?ensureOpen()?to?tell?the?user?we?are?already?closed.*/
    ????????????ensureOpen();
    ????????????throw?ex;
    ????????}
    ????}

    ????@Override
    ????public?ThreadContextStruct?get()?{
    ????????try?{
    ????????????ThreadContextStruct?threadContextStruct?=?super.get();
    ????????????if?(threadContextStruct?!=?null)?{
    ????????????????return?threadContextStruct;
    ????????????}
    ????????????return?DEFAULT_CONTEXT;
    ????????}?catch?(NullPointerException?ex)?{
    ????????????/*?This?is?odd?but?CloseableThreadLocal?throws?a?NPE?if?it?was?closed?but?still?accessed.
    ???????????????to?get?a?real?exception?we?call?ensureOpen()?to?tell?the?user?we?are?already?closed.*/
    ????????????ensureOpen();
    ????????????throw?ex;
    ????????}
    ????}

    ????private?void?ensureOpen()?{
    ????????if?(closed.get())?{
    ????????????throw?new?IllegalStateException("threadcontext?is?already?closed");
    ????????}
    ????}

    ????public?void?close()?{
    ????????if?(closed.compareAndSet(false,?true))?{
    ????????????super.close();
    ????????}
    ????}
    }

    ThreadContextStruct提供了從StreamInput讀取headers的構造函數,以及一系列添加request/response headers的方法,ThreadContextStruct是headers讀取寫入的一個管理工具類。

    private?static?final?class?ThreadContextStruct?{
    ????private?final?Map?requestHeaders;private?final?Map?transientHeaders;private?final?Map>?responseHeaders;private?ThreadContextStruct(StreamInput?in)?throws?IOException?{//...
    ????}
    }

    elasticsearch捕獲恢復ThreadLocal的方式很巧妙,具體的實現類是ThreadContext.StoredContext。它是一個函數式接口,#restore()方法默認是調用#close()方法。

    @FunctionalInterface
    public?interface?StoredContext?extends?AutoCloseable?{
    ????@Override
    ????void?close();

    ????default?void?restore()?{
    ????????close();
    ????}
    }

    ThreadContext提供了#newStoredContext()和#stashContext()方法分別用于捕獲回放和清除恢復線程,二者的實現類似,但是注意到它們的執行線程是不一致的。#newStoredContext()方法是在調用線程中運行的,而#stashContext()是在工作線程中運行的。

    public?StoredContext?newStoredContext(boolean?preserveResponseHeaders)?{
    ????final?ThreadContextStruct?context?=?threadLocal.get();
    ????return?()??->?{
    ????????if?(preserveResponseHeaders?&&?threadLocal.get()?!=?context)?{
    ????????????threadLocal.set(context.putResponseHeaders(threadLocal.get().responseHeaders));
    ????????}?else?{
    ????????????threadLocal.set(context);
    ????????}
    ????};
    }

    public?StoredContext?stashContext()?{
    ????final?ThreadContextStruct?context?=?threadLocal.get();
    ????threadLocal.set(null);
    ????return?()?->?threadLocal.set(context);
    }

    而elasticsearch提供了兩種對Runnable的修飾類,分別是ContextPreservingRunnable和ContextPreservingAbstractRunnable,二者都在構造器中都捕獲了調用線程的ThreadContext中threadLocal變量中持有的ThreadContextStruct對象并封裝為ThreadContext.StoredContext,然后在執行run方法前先備份工作線程的ThreadLocal變量,在執行業務邏輯后恢復工作線程的環境,只不過ContextPreservingRunnable使用的是try/with方式,而ContextPreservingAbstractRunnable是在#onAfter回調中。

    private?class?ContextPreservingRunnable?implements?WrappedRunnable?{
    ????private?final?Runnable?in;
    ????private?final?ThreadContext.StoredContext?ctx;

    ????private?ContextPreservingRunnable(Runnable?in)?{
    ????????ctx?=?newStoredContext(false);
    ????????this.in?=?in;
    ????}

    ????@Override
    ????public?void?run()?{
    ????????boolean?whileRunning?=?false;
    ????????try?(ThreadContext.StoredContext?ignore?=?stashContext()){
    ????????????ctx.restore();
    ????????????in.run();
    ????????}?
    ????}
    }

    private?class?ContextPreservingAbstractRunnable?extends?AbstractRunnable?implements?WrappedRunnable?{
    ????private?final?AbstractRunnable?in;
    ????private?final?ThreadContext.StoredContext?creatorsContext;

    ????private?ThreadContext.StoredContext?threadsOriginalContext?=?null;

    ????private?ContextPreservingAbstractRunnable(AbstractRunnable?in)?{
    ????????creatorsContext?=?newStoredContext(false);
    ????????this.in?=?in;
    ????}

    ????@Override
    ????public?void?onAfter()?{
    ????????try?{
    ????????????in.onAfter();
    ????????}?finally?{
    ????????????if?(threadsOriginalContext?!=?null)?{
    ????????????????threadsOriginalContext.restore();
    ????????????}
    ????????}
    ????}

    ????@Override
    ????protected?void?doRun()?throws?Exception?{
    ????????threadsOriginalContext?=?stashContext();
    ????????creatorsContext.restore();
    ????????in.doRun();
    ????}
    }

    elasticsearch的線程池實現提供了開箱即用的ThreadContext支持,它會對每個執行的任務進行修飾,關鍵代碼如下:

    public?class?EsThreadPoolExecutor?extends?ThreadPoolExecutor?{

    ????private?final?ThreadContext?contextHolder;
    ???
    ????@Override
    ????public?void?execute(Runnable?command)?{
    ????????command?=?wrapRunnable(command);
    ????????try?{
    ????????????super.execute(command);
    ????????}?catch?(EsRejectedExecutionException?ex)?{
    ????????????if?(command?instanceof?AbstractRunnable)?{
    ????????????????//?If?we?are?an?abstract?runnable?we?can?handle?the?rejection
    ????????????????//?directly?and?don't?need?to?rethrow?it.
    ????????????????try?{
    ????????????????????((AbstractRunnable)?command).onRejection(ex);
    ????????????????}?finally?{
    ????????????????????((AbstractRunnable)?command).onAfter();

    ????????????????}
    ????????????}?else?{
    ????????????????throw?ex;
    ????????????}
    ????????}
    ????}

    ????protected?Runnable?wrapRunnable(Runnable?command)?{
    ????????return?contextHolder.preserveContext(command);
    ????}
    }

    //?ThreadContext#preserveContext(Runnable)
    public?Runnable?preserveContext(Runnable?command)?{
    ????if?(command?instanceof?ContextPreservingAbstractRunnable)?{
    ????????return?command;
    ????}
    ????if?(command?instanceof?ContextPreservingRunnable)?{
    ????????return?command;
    ????}
    ????if?(command?instanceof?AbstractRunnable)?{
    ????????return?new?ContextPreservingAbstractRunnable((AbstractRunnable)?command);
    ????}
    ????return?new?ContextPreservingRunnable(command);
    }

    總結

    以上是生活随笔為你收集整理的threadlocal使用_多方位点评ThreadLocal,细看各大开源软件实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 丝袜美腿亚洲综合 | 超污巨黄的小短文 | 天天超碰 | 成人激情在线 | 国产免费一区二区三区最新不卡 | 成人黄色一级片 | 成人勉费视频 | 99激情视频 | 在线免费色 | 天天做天天看 | 国产精品久久久久永久免费看 | 欧美黑人巨大xxx极品 | 丰满人妻一区二区三区免费视频 | 最新地址在线观看 | 日本深夜福利 | 国产网站黄色 | 中文字幕黄色av | 天天干夜夜欢 | 蜜桃av在线看 | 天天干天天干天天干 | 欧美日韩亚| 中文字幕欧美在线 | 主播av在线| 午夜精品区 | 成人免费一区二区三区 | 日韩成人综合 | 麻豆91茄子在线观看 | 亚洲成人手机在线 | 一级片在线视频 | 美国一级特黄 | 97精品国产97久久久久久免费 | 久久精品www人人爽人人 | 亚洲性xx | 日本免费一区二区三区四区 | 成熟人妻av无码专区 | 中文字幕在线视频免费播放 | 麻豆久久久久久 | 日韩精品成人无码专区免费 | 成人在线观看免费网站 | 成人国产精品入口免费视频 | 国产精品久久婷婷六月丁香 | 欧美日韩精品一区二区在线观看 | 色久影院 | 亚洲一区二区三区蜜桃 | av片在线免费观看 | 日本r级电影在线观看 | 欧美福利影院 | 亚洲天堂成人在线 | 三级欧美视频 | 毛片天堂| 亚洲国产成人精品女人久久 | free国产hd露脸性开放 | 91亚洲精选| 免费久久久 | 亚洲一区二区乱码 | 日韩精品一区二区三区久久 | 在线看v片| 欧美日韩国产一级片 | 久久精品视频一区 | 欧美4区| 国产成人av一区二区三区不卡 | 狠狠欧美| 奇米影视在线视频 | 91视频爱爱 | 欧美色久 | 欧美午夜一区 | 免费一级网站 | 国产视频网站在线观看 | 亚洲乱妇 | 98堂 最新网名 | 久久久视频6r | 星空大象mv高清在线观看免费 | 在线99 | 黄色生活毛片 | 久久久久久久久免费看无码 | 尤物视频最新网址 | 久免费一级suv好看的国产 | 寂寞人妻瑜伽被教练日 | 你懂的网址在线 | 日本精品久久久 | 免费成人在线电影 | 公交上高潮的丁芷晴 | 亚洲资源网站 | 手机在线一区二区三区 | 亚洲呦呦 | 欧美xxxⅹ性欧美大片 | 日日干天天 | 激情亚洲色图 | 在线免费精品视频 | 老司机午夜在线 | 久久精品高清视频 | 三上悠亚中文字幕在线播放 | 99福利在线 | 任你操精品视频 | 欧美女优一区二区 | 丰满少妇av | www毛片com| 中国无码人妻丰满熟妇啪啪软件 | 日本变态折磨凌虐bdsm在线 |