歡迎關注公衆號【sharedCode】致力於主流中間件的源碼分析, 我的網站:https://www.shared-code.com/java
前言
在上一篇文章 多線程篇-父子線程的上下文傳遞
的文末,咱們瞭解到JDK提供的InheritableThreadLocal
在線程池中的使用狀況並非太理想,由於在複用線程的狀況下,獲得的值頗有可能不是咱們想要的,接下來我要給你們介紹一款開源組件,阿里開源的,用的感受還不錯。git
TransmittableThreadLocal
通常狀況下,ThreadLocal均可以知足咱們的需求,當咱們出現須要 在使用線程池等會池化複用線程的執行組件狀況下傳遞ThreadLocal
,github
這個場景就是TransmittableThreadLocal解決的問題。安全
Github地址:https://github.com/alibaba/transmittable-thread-local多線程
感興趣的能夠去下載的玩一玩,接下來咱們來介紹一下這個組件的神奇之處。ide
首先看個demo, 經過demo,咱們先了解了解怎麼用源碼分析
demo
/** * ttl測試 * * @author zhangyunhe * @date 2020-04-23 12:47 */ public class Test { // 1. 初始化一個TransmittableThreadLocal,這個是繼承了InheritableThreadLocal的 static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>(); // 初始化一個長度爲1的線程池 static ExecutorService poolExecutor = Executors.newFixedThreadPool(1); public static void main(String[] args) throws ExecutionException, InterruptedException { Test test = new Test(); test.test(); } private void test() throws ExecutionException, InterruptedException { // 設置初始值 local.set("天王老子"); //!!!! 注意:這個地方的Task是使用了TtlRunnable包裝的 Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務1"))); future.get(); Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任務2"))); future2.get(); System.out.println("父線程的值:"+local.get()); poolExecutor.shutdown(); } class Task implements Runnable{ String str; Task(String str){ this.str = str; } @Override public void run() { // 獲取值 System.out.println(Thread.currentThread().getName()+":"+local.get()); // 從新設置一波 local.set(str); } } }
輸出結果:測試
pool-1-thread-1:天王老子 pool-1-thread-1:天王老子 父線程的值:天王老子
原理分析
咱們首先看一下TransmittableThreadLocal
的源碼,網站
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> { // 1. 此處的holder是他的主要設計點,後續在構建TtlRunnable private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } }; @SuppressWarnings("unchecked") private void addThisToHolder() { if (!holder.get().containsKey(this)) { holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value. } } @Override public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); return value; } /** * see {@link InheritableThreadLocal#set} */ @Override public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // may set null to remove value remove(); } else { super.set(value); addThisToHolder(); } } /** * see {@link InheritableThreadLocal#remove()} */ @Override public final void remove() { removeThisFromHolder(); super.remove(); } private void superRemove() { super.remove(); } }
步驟說明:this
- 在代碼中,做者構建了一個holder對象,這個對象是一個
InheritableThreadLocal
, 裏面的類型是一個弱引用的WeakHashMap , 這個map的va lu就是TransmittableThreadLocal
, 至於value永遠都是空的
holder裏面存儲的是這個應用裏面,全部關於TransmittableThreadLocal
的引用。
- 從上面能夠看到,每次get, set ,remove都會操做holder對象,這樣作的目的是爲了保持
TransmittableThreadLocal
全部的這個引用都在holder裏面存一份。
TtlRunnable
回到咱們上面的代碼
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任務1")));
細心的朋友可能已經發現了,咱們調用了TtlRunnable
對象的get方法,下面看一下這個方法有啥做用吧
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null; if (runnable instanceof TtlEnhanced) { // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException("Already TtlRunnable!"); } // 重點在這裏 return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); }
看上面的代碼,細節上咱們不看,咱們看大體的思路, 這個地方主要就是根據傳入的runnable構建了一個TtlRunnable對象。
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { //重點在這裏 this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
下面這行代碼,運行到這裏的時候,仍是在主線程裏面,調用了capture
方法
this.capturedRef = new AtomicReference<Object>(capture());
capture
public static Object capture() { // 構建一個臨時對象,主要看captureTtlValues方法 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { // 構建一個WeakHashMap方法, WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); // 在主線程裏面,調用holder變量,循環獲取裏面全部的key和value for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } // 返回出去 return ttl2Value; }
步驟說明:
1.調用靜態變量holder, 循環獲取裏面全部的key和value, value的獲取就比較巧妙一點。
private T copyValue() { // 這裏的get方法,調用的是父類的方法,能夠在父類裏面最終獲取到當前TransmittableThreadLocal所對應的value return copy(get()); }
2.組裝好一個WeakHashMap出去,最終就會到了咱們上面的構造方法裏面,針對capturedRef
的賦值操做。
run
@Override public void run() { //1. 獲取到剛剛構造TtlRunnable對象的時候初始化的capturedRef對象。包含了從submit丟任務進來的時候父線程的數據 Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } // 清除不在captured裏面的key,同時在這個子線程中,對全部的ThreadLocal進行從新設置值 Object backup = replay(captured); try { // 執行實際的線程方法 runnable.run(); } finally { // 作好還原工做,根據backup restore(backup); } } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 作好當前線程的local備份 backup.put(threadLocal, threadLocal.get()); // 清除數據,不在captured裏面的。 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 這裏就是把值設置到當前線程的TransmittableThreadLocal裏面。 setTtlValuesTo(captured); // 一個鉤子 doExecuteCallback(true); return backup; }
總結:
一些朋友看了上面的那麼多源碼,可能有點蒙,我可能說的也有點亂,在這裏總結一下。
1.經過繼承InheritableThreadLocal,新成立一個TransmittableThreadLocal類, 該類中有一個hodel變量,用來維護全部的TransmittableThreadLocal引用。
2.在實際submit任務到線程池的時候,咱們是須要調用TtlRunnable.get方法,構建一個任務的包裝類。這裏使用裝飾者模式,對runnable線程對象進行裝飾包裝,在初始化這個包裝對象的時候,會獲取主線程裏面全部的TransmittableThreadLocal引用,以及裏面全部的值,這個值其實就是當前父線程裏面的(跟你當時建立這個線程的父線程沒有任何關係,注意,這裏講的是線程池的場景)。
3.對數據作規整,根據收集到的captured
(這個對象裏面存儲的都是主線程裏面可以獲取到TransmittableThreadLocal以及對應的值) 作規整,去掉當前線程裏面不須要的,同時將剩餘的key和value ,更新到當前線程的ThreadLocal裏面。這樣就達到了在池化技術裏面父子線程傳值的安全性