海外商城從印度作起,慢慢的會有一些其餘國家的訴求,這個時候須要咱們針對當前的商城作一個改造,能夠支撐多個國家的商城,這裏會涉及多個問題,多語言,多國家,多時區,本地化等等。在多國家的狀況下如何把識別出來的國家信息傳遞下去,一層一層直到代碼執行的最後一步。甚至還有一些多線程的場景須要處理。java
ThreadLocal是最容易想到了,入口識別到國家信息後,丟進ThreadLocal,這樣後續代碼、redis、DB等作國家區分的時候都能使用到。redis
這裏先簡單介紹一下ThreadLocal:sql
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */ public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } /** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } /** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */ private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); }
get方法首先經過Thread.currentThread獲得當前線程,而後拿到線程的threadLocals(ThreadLocalMap),再從Entry中取得當前線程存儲的value。數據庫
實際使用中除了同步方法以外,還有起異步線程處理的場景,這個時候就須要把ThreadLocal的內容從父線程傳遞給子線程,這個怎麼辦呢?cookie
不急,Java 還有InheritableThreadLocal來幫咱們解決這個問題。mybatis
public class InheritableThreadLocal<T> extends ThreadLocal<T> { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. * <p> * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
InheritableThreadLocal操做的是inheritableThreadLocals這個變量,而不是ThreadLocal操做的threadLocals變量。多線程
建立新線程的時候會檢查父線程中parent.inheritableThreadLocals變量是否爲null,若是不爲null則複製一份parent.inheritableThreadLocals的數據到子線程的this.inheritableThreadLocals中去。app
如今在使用多線程的時候,都是經過線程池來作的,這個時候用InheritableThreadLocal能夠嗎?會有什麼問題嗎?先看下下面的代碼的執行狀況:框架
static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>(); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); inheritableThreadLocal.set("i am a inherit parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } }); TimeUnit.SECONDS.sleep(1); inheritableThreadLocal.set("i am a new inherit parent");// 設置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } }); } i am a inherit parent i am a inherit parent public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); inheritableThreadLocal.set("i am a inherit parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); inheritableThreadLocal.set("i am a old inherit parent");// 子線程中設置新的值 } }); TimeUnit.SECONDS.sleep(1); inheritableThreadLocal.set("i am a new inherit parent");// 主線程設置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(inheritableThreadLocal.get()); } }); } i am a inherit parent i am a old inherit parent
這裏看第一個執行結果,發現主線程第二次設置的值,沒有改掉,仍是第一次設置的值「i am a inherit parent」,這是什麼緣由呢?異步
再看第二個例子的執行結果,發如今第一個任務中設置的「i am a old inherit parent"的值,在第二個任務中打印出來了。這又是什麼緣由呢?
回過頭來看看上面的源碼,在線程池的狀況下,第一次建立線程的時候會從父線程中copy inheritableThreadLocals中的數據,因此第一個任務成功拿到了父線程設置的」i am a inherit parent「,第二個任務執行的時候複用了第一個任務的線程,並不會觸發複製父線程中的inheritableThreadLocals操做,因此即便在主線程中設置了新的值,也會不生效。同時get()方法是直接操做inheritableThreadLocals這個變量的,因此就直接拿到了第一個任務設置的值。
那遇到線程池應該怎麼辦呢?
TransmittableThreadLocal(TTL)這個時候就派上用場了。這是阿里開源的一個組件,咱們來看看它怎麼解決線程池的問題,先來一段代碼,在上面的基礎上修改一下,使用TransmittableThreadLocal。
static TransmittableThreadLocal<String> transmittableThreadLocal = new TransmittableThreadLocal<>();// 使用TransmittableThreadLocal public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(1); executorService = TtlExecutors.getTtlExecutorService(executorService); // 用TtlExecutors裝飾線程池 transmittableThreadLocal.set("i am a transmittable parent"); executorService.execute(new Runnable() { @Override public void run() { System.out.println(transmittableThreadLocal.get()); transmittableThreadLocal.set("i am a old transmittable parent");// 子線程設置新的值 } }); System.out.println(transmittableThreadLocal.get()); TimeUnit.SECONDS.sleep(1); transmittableThreadLocal.set("i am a new transmittable parent");// 主線程設置新的值 executorService.execute(new Runnable() { @Override public void run() { System.out.println(transmittableThreadLocal.get()); } }); } i am a transmittable parent i am a transmittable parent i am a new transmittable parent
執行代碼後發現,使用TransmittableThreadLocalTtlExecutors.getTtlExecutorService(executorService)裝飾線程池以後,在每次調用任務的時,都會將當前的主線程的TransmittableThreadLocal數據copy到子線程裏面,執行完成後,再清除掉。同時子線程裏面的修改回到主線程時其實並無生效。這樣能夠保證每次任務執行的時候都是互不干涉的。這是怎麼作到的呢?來看源碼。
private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } com.alibaba.ttl.TtlRunnable#run /** * wrap method {@link Runnable#run()}. */ @Override public void run() { Object captured = capturedRef.get();// 獲取線程的ThreadLocalMap if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } Object backup = replay(captured);// 暫存當前子線程的ThreadLocalMap到backup try { runnable.run(); } finally { restore(backup);// 恢復線程執行時被改版的Threadlocal對應的值 } } com.alibaba.ttl.TransmittableThreadLocal.Transmitter#replay /** * Replay the captured {@link TransmittableThreadLocal} values from {@link #capture()}, * and return the backup {@link TransmittableThreadLocal} values in current thread before replay. * * @param captured captured {@link TransmittableThreadLocal} values from other thread from {@link #capture()} * @return the backup {@link TransmittableThreadLocal} values before replay * @see #capture() * @since 2.3.0 */ public static Object replay(Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL value only in captured // avoid extra TTL value in captured, when run task. if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set value to captured TTL for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : capturedMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } // call beforeExecute callback doExecuteCallback(true); return backup; } com.alibaba.ttl.TransmittableThreadLocal.Transmitter#restore /** * Restore the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)}. * * @param backup the backup {@link TransmittableThreadLocal} values from {@link Transmitter#replay(Object)} * @since 2.3.0 */ public static void restore(Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // clear the TTL value only in backup // avoid the extra value of backup after restore if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL value for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : backupMap.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } }
能夠看下整個過程的完整時序圖:
OK,既然問題都解決了,來看看實際使用吧,有兩種使用,先看第一種,涉及HTTP請求、Dubbo請求和 job,採用的是數據級別的隔離。
用戶 HTTP 請求,首先咱們要從url或者cookie中解析出國家編號,而後在TransmittableThreadLocal中存放國家信息,在 MyBatis 的攔截器中讀取國家數據,進行sql改造,最終操做指定的國家數據,多線程場景下用TtlExecutors包裝原有自定義線程池,保障在使用線程池的時候可以正確將國家信息傳遞下去。
public class ShopShardingHelperUtil { private static TransmittableThreadLocal<String> countrySet = new TransmittableThreadLocal<>(); /** * 獲取threadLocal中設置的國家標誌 * @return */ public static String getCountry() { return countrySet.get(); } /** * 設置threadLocal中設置的國家 */ public static void setCountry (String country) { countrySet.set(country.toLowerCase()); } /** * 清除標誌 */ public static void clear () { countrySet.remove(); } } /** 攔截器對cookie和url綜合判斷國家信息,放入到TransmittableThreadLocal中 **/ // 設置線程中的國家標誌 String country = localeContext.getLocale().getCountry().toLowerCase(); ShopShardingHelperUtil.setCountry(country); /** 自定義線程池,用TtlExecutors包裝原有自定義線程池 **/ public static Executor getExecutor() { if (executor == null) { synchronized (TransmittableExecutor.class) { if (executor == null) { executor = TtlExecutors.getTtlExecutor(initExecutor());// 用TtlExecutors裝飾Executor,結合TransmittableThreadLocal解決異步線程threadlocal傳遞問題 } } } return executor; } /** 實際使用線程池的地方,直接調用執行便可**/ TransmittableExecutor.getExecutor().execute(new BatchExeRunnable(param1,param2)); /** mybatis的Interceptor代碼, 使用TransmittableThreadLocal的國家信息,改造原有sql,加上國家參數,在增刪改查sql中區分國家數據 **/ public Object intercept(Invocation invocation) throws Throwable { StatementHandler statementHandler = (StatementHandler) invocation.getTarget(); BoundSql boundSql = statementHandler.getBoundSql(); String originalSql = boundSql.getSql(); Statement statement = (Statement) CCJSqlParserUtil.parse(originalSql); String threadCountry = ShopShardingHelperUtil.getCountry(); // 線程中的國家不爲空才進行處理 if (StringUtils.isNotBlank(threadCountry)) { if (statement instanceof Select) { Select selectStatement = (Select) statement; VivoSelectVisitor vivoSelectVisitor = new VivoSelectVisitor(threadCountry); vivoSelectVisitor.init(selectStatement); } else if (statement instanceof Insert) { Insert insertStatement = (Insert) statement; VivoInsertVisitor vivoInsertVisitor = new VivoInsertVisitor(threadCountry); vivoInsertVisitor.init(insertStatement); } else if (statement instanceof Update) { Update updateStatement = (Update) statement; VivoUpdateVisitor vivoUpdateVisitor = new VivoUpdateVisitor(threadCountry); vivoUpdateVisitor.init(updateStatement); } else if (statement instanceof Delete) { Delete deleteStatement = (Delete) statement; VivoDeleteVisitor vivoDeleteVisitor = new VivoDeleteVisitor(threadCountry); vivoDeleteVisitor.init(deleteStatement); } Field boundSqlField = BoundSql.class.getDeclaredField("sql"); boundSqlField.setAccessible(true); boundSqlField.set(boundSql, statement.toString()); } else { logger.error("----------- intercept not-add-country sql.... ---------" + statement.toString()); } logger.info("----------- intercept query new sql.... ---------" + statement.toString()); // 調用方法,實際上就是攔截的方法 Object result = invocation.proceed(); return result; }
對於 Dubbo 接口和沒法判斷國家信息的 HTTP 接口,在入參部分增長國家信息參數,經過攔截器或者手動set國家信息到TransmittableThreadLocal。
對於定時任務 job,由於全部國家都須要執行,因此會把全部國家進行遍歷執行,這也能夠經過簡單的註解來解決。
這個版本的改造,點檢測試也基本經過了,自動化腳本驗證也是沒問題的,不過由於業務發展問題最終沒上線。
後續在建設新的國家商城的時候,分庫分表方案調整爲每一個國家獨立數據庫,同時總體開發框架升級到SpringBoot,咱們把這套方案作了升級,整體思路是同樣的,只是在實現細節上略有不一樣。
SpringBoot 裏面的異步通常經過@Async這個註解來實現,經過自定義線程池來包裝,使用時在 HTTP 請求判斷locale信息的寫入國家信息,後續完成切DB的操做。
對於 Dubbo 接口和沒法判斷國家信息的 HTTP 接口,在入參部分增長國家信息參數,經過攔截器或者手動set國家信息到TransmittableThreadLocal。
@Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ return TtlThreadPoolExecutors.getAsyncExecutor(); } public class TtlThreadPoolExecutors { private static final String COMMON_BUSINESS = "COMMON_EXECUTOR"; public static final int QUEUE_CAPACITY = 20000; public static ExecutorService getExecutorService() { return TtlExecutorServiceMananger.getExecutorService(COMMON_BUSINESS); } public static ExecutorService getExecutorService(String threadGroupName) { return TtlExecutorServiceMananger.getExecutorService(threadGroupName); } public static ThreadPoolTaskExecutor getAsyncExecutor() { // 用TtlExecutors裝飾Executor,結合TransmittableThreadLocal解決異步線程threadlocal傳遞問題 return getTtlThreadPoolTaskExecutor(initTaskExecutor()); } private static ThreadPoolTaskExecutor initTaskExecutor () { return initTaskExecutor(TtlThreadPoolFactory.DEFAULT_CORE_SIZE, TtlThreadPoolFactory.DEFAULT_POOL_SIZE, QUEUE_CAPACITY); } private static ThreadPoolTaskExecutor initTaskExecutor (int coreSize, int poolSize, int executorQueueCapacity) { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(coreSize); taskExecutor.setMaxPoolSize(poolSize); taskExecutor.setQueueCapacity(executorQueueCapacity); taskExecutor.setKeepAliveSeconds(120); taskExecutor.setAllowCoreThreadTimeOut(true); taskExecutor.setThreadNamePrefix("TaskExecutor-ttl"); taskExecutor.initialize(); return taskExecutor; } private static ThreadPoolTaskExecutor getTtlThreadPoolTaskExecutor(ThreadPoolTaskExecutor executor) { if (null == executor || executor instanceof ThreadPoolTaskExecutorWrapper) { return executor; } return new ThreadPoolTaskExecutorWrapper(executor); } } /** * @ClassName : LocaleContextHolder * @Description : 本地化信息上下文holder */ public class LocalizationContextHolder { private static TransmittableThreadLocal<LocalizationContext> localizationContextHolder = new TransmittableThreadLocal<>(); private static LocalizationInfo defaultLocalizationInfo = new LocalizationInfo(); private LocalizationContextHolder(){} public static LocalizationContext getLocalizationContext() { return localizationContextHolder.get(); } public static void resetLocalizationContext () { localizationContextHolder.remove(); } public static void setLocalizationContext (LocalizationContext localizationContext) { if(localizationContext == null) { resetLocalizationContext(); } else { localizationContextHolder.set(localizationContext); } } public static void setLocalizationInfo (LocalizationInfo localizationInfo) { LocalizationContext localizationContext = getLocalizationContext(); String brand = (localizationContext instanceof BrandLocalizationContext ? ((BrandLocalizationContext) localizationContext).getBrand() : null); if(StringUtils.isNotEmpty(brand)) { localizationContext = new SimpleBrandLocalizationContext(localizationInfo, brand); } else if(localizationInfo != null) { localizationContext = new SimpleLocalizationContext(localizationInfo); } else { localizationContext = null; } setLocalizationContext(localizationContext); } public static void setDefaultLocalizationInfo(@Nullable LocalizationInfo localizationInfo) { LocalizationContextHolder.defaultLocalizationInfo = localizationInfo; } public static LocalizationInfo getLocalizationInfo () { LocalizationContext localizationContext = getLocalizationContext(); if(localizationContext != null) { LocalizationInfo localizationInfo = localizationContext.getLocalizationInfo(); if(localizationInfo != null) { return localizationInfo; } } return defaultLocalizationInfo; } public static String getCountry(){ return getLocalizationInfo().getCountry(); } public static String getTimezone(){ return getLocalizationInfo().getTimezone(); } public static String getBrand(){ return getBrand(getLocalizationContext()); } public static String getBrand(LocalizationContext localizationContext) { if(localizationContext == null) { return null; } if(localizationContext instanceof BrandLocalizationContext) { return ((BrandLocalizationContext) localizationContext).getBrand(); } throw new LocaleException("unsupported localizationContext type"); } } @Override public LocaleContext resolveLocaleContext(final HttpServletRequest request) { parseLocaleCookieIfNecessary(request); LocaleContext localeContext = new TimeZoneAwareLocaleContext() { @Override public Locale getLocale() { return (Locale) request.getAttribute(LOCALE_REQUEST_ATTRIBUTE_NAME); } @Override public TimeZone getTimeZone() { return (TimeZone) request.getAttribute(TIME_ZONE_REQUEST_ATTRIBUTE_NAME); } }; // 設置線程中的國家標誌 setLocalizationInfo(request, localeContext.getLocale()); return localeContext; } private void setLocalizationInfo(HttpServletRequest request, Locale locale) { String country = locale!=null?locale.getCountry():null; String language = locale!=null?(locale.getLanguage() + "_" + locale.getVariant()):null; LocaleRequestMessage localeRequestMessage = localeRequestParser.parse(request); final String countryStr = country; final String languageStr = language; final String brandStr = localeRequestMessage.getBrand(); LocalizationContextHolder.setLocalizationContext(new BrandLocalizationContext() { @Override public String getBrand() { return brandStr; } @Override public LocalizationInfo getLocalizationInfo() { return LocalizationInfoAssembler.assemble(countryStr, languageStr); } }); }
對於定時任務job,由於全部國家都須要執行,因此會把全部國家進行遍歷執行,這也能夠經過簡單的註解和AOP來解決。
本文從業務拓展的角度闡述了在複雜業務場景下如何經過ThreadLocal,過渡到InheritableThreadLocal,再經過TransmittableThreadLocal解決實際業務問題。由於海外的業務在不斷的探索中前進,技術也在不斷的探索中演進,面對這種複雜多變的狀況,咱們的應對策略是先作國際化,再作本地化,more global才能more local,多國家的隔離只是國際化最基本的起點,將來還有不少業務和技術等着咱們去挑戰。
做者:vivo 官網商城開發團隊