前段時間,本人一直協助項目組在作系統的重構,系統應用被拆分紅了多個服務,部分服務作了集羣部署。隨着上述架構的演進,天然而然的引進了ELK + Filebeat 作日誌收集。可是在使用Kibana查看日誌時,因爲缺乏TraceID
,致使開發人員很難篩選出指定請求的相關日誌,也很難追蹤應用對下游服務的調用過程,耗費了不少時間。我本身查過幾回問題以後,實在受不了每次要花這麼久的時間,就趕忙向主管提了這一次的改造。html
本篇文章主要是記錄本人對項目TraceID
鏈路追蹤改造的解決方案的研究、遇到的問題和具體的實現,同時本次改造也加深了我本身對分佈式服務追蹤的一些理解,我也寫在了裏面。java
本文主要內容:web
大致的思路就是藉助slf4j的MDC功能 + Spring Interceptor,當外部請求進入時生成一個traceId放在MDC當中。spring
這裏簡單介紹一下MDC。apache
MDC(Mapped Diagnostic Context,映射調試上下文)是 log4j 和 logback 提供的一種方便在多線程條件下記錄日誌的功能。MDC 能夠當作是一個與當前線程綁定的Map,能夠往其中添加鍵值對。MDC 中包含的內容能夠被同一線程中執行的代碼所訪問。當前線程的子線程會繼承其父線程中的 MDC 的內容。當須要記錄日誌時,只須要從 MDC 中獲取所需的信息便可。MDC 的內容則由程序在適當的時候保存進去。對於一個 Web 應用來講,一般是在請求被處理的最開始保存這些數據。
簡單來講,MDC就是日誌框架提供的一個InheritableThreadLocal
,項目代碼中能夠將鍵值對放入其中,在打印的時候從ThreadLocal
中獲取到對應的值而後打印出來。詳細的原理本文就不贅述了。看下 log4j 和 logback 裏面的實現類就知道了。segmentfault
TraceInterceptor
/** * @author Richard_yyf */ public class TraceInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 清空 MDC.clear(); ThreadMdcUtil.setTraceIdIfAbsent(); //後續邏輯... ... return true; } }
/** * @author Richard_yyf */ @Configuration public class WebMvcConfig implements WebMvcConfigurer { @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(traceInterceptor()) .addPathPatterns("/**") .order(0); } @Bean public TraceInterceptor traceInterceptor() { return new TraceInterceptor(); } }
ThreadMdcUtil
是我本身封裝的一個工具類,包裝了對 TraceId 的一些操做:多線程
public class ThreadMdcUtil { public static String createTraceId() { String uuid = UUID.randomUUID().toString(); return DigestUtils.md5Hex(uuid).substring(8, 24); } public static void setTraceIdIfAbsent() { if (MDC.get(TRACE_ID) == null) { MDC.put(TRACE_ID, createTraceId()); } } // 省略了一些方法在後面會展現出來 }
DigestUtils
來自於第三方依賴:架構
<dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>***</version> </dependency>
TRACE_ID
放在 Constant
類中方便引用:app
public class Constant { ... public static final String TRACE_ID = "traceId"; ... }
取值方式:%X{traceid}
框架
經過上面的步驟以後,你的web應用接收到請求後打印的日誌就會帶上TraceId
。
前面的方案只是簡單實現了咱們的最基礎的需求。可是若是你真的使用起來,會發現異步的任務線程是沒有獲取到TraceID
的。
一個成熟的應用確定會用到不少的線程池。常見的有@Async
異步調用的線程池,應用自身定義的一些線程池等等。
前面有稍微提到過,MDC是經過InheritableThreadLocal
實現的,建立子線程時,會複製父線程的inheritableThreadLocals屬性。可是在線程池中,線程是複用的,而不是新建立的,因此MDC內容就沒法傳遞進去。
因此咱們就須要曲線救國,既然線程是複用的,那咱們理所固然的就能想到在任務提交至線程池的時候作一些「騷」操做,來說MDC的內容傳遞下去。
這裏就直接放上代碼:
/** * @author Richard_yyf */ public class ThreadMdcUtil { public static String createTraceId() { String uuid = UUID.randomUUID().toString(); return DigestUtils.md5Hex(uuid).substring(8, 24); } public static void setTraceIdIfAbsent() { if (MDC.get(TRACE_ID) == null) { MDC.put(TRACE_ID, createTraceId()); } } public static void setTraceId() { MDC.put(TRACE_ID, createTraceId()); } public static void setTraceId(String traceId) { MDC.put(TRACE_ID, traceId); } public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } setTraceIdIfAbsent(); try { return callable.call(); } finally { MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } setTraceIdIfAbsent(); try { runnable.run(); } finally { MDC.clear(); } }; } }
本身包裝擴展 ThreadPoolExecutor
/** * @author Richard_yyf */ public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor { public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(Runnable task) { super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Runnable task, T result) { return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap())); } }
具體的使用就是把你原來executor = new ThreadPoolExecutor(...)
改爲executor = new ThreadPoolExecutorMdcWrapper(...)
便可。
好比你是用Spring @Async
異步方法的,在配置線程池的時候就這樣聲明:
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @EnableAsync @Configuration class TaskPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); return executor; } } }
按照上述步驟,你的異步任務在打印日誌的時候,就會帶上本來請求的TraceID了。
咱們項目組主要使用Dubbo進行微服務框架的開發。咱們想在服務調用之間,傳遞上游服務的TraceID
,來達到鏈路追蹤的效果。
Dubbo 提供了這樣的機制,能夠經過Dubbo RPC
+ Dubbo Filter
來設置和傳遞消費者的TraceID
。
詳見官網對於這兩個概念的說明。
Dubbo RPC
Dubbo Filter
這邊我直接給出代碼和擴展點配置。
消費者應用端:
/** * @author Richard_yyf */ @Activate(group = {Constants.CONSUMER}) public class ConsumerRpcTraceFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { //若是MDC上下文有追蹤ID,則原樣傳遞給provider端 String traceId = MDC.get(TRACE_ID); if (StringUtils.isNotEmpty(traceId)) { RpcContext.getContext().setAttachment(TRACE_ID, traceId); } return invoker.invoke(invocation); } }
SPI 配置:
在resources
目錄下,建立/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter
文件.
consumerRpcTraceFilter=com.xxx.xxx.filter.ConsumerRpcTraceFilter
服務提供者應用端:
/** * @author Richard_yyf */ @Activate(group = {Constants.PROVIDER}) public class ProviderRpcTraceFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 接收消費端的traceId String traceId = RpcContext.getContext().getAttachment(TRACE_ID); if (StringUtils.isBlank(traceId)) { traceId = ThreadMdcUtil.createTraceId(); } // 設置日誌traceId ThreadMdcUtil.setTraceId(traceId); // TODO 若是這個服務還會調用下一個服務,須要再次設置下游參數 // RpcContext.getContext().setAttachment("trace_id", traceId); try { return invoker.invoke(invocation); } finally { // 調用完成後移除MDC屬性 MDC.remove(TRACE_ID); } } }
SPI 配置:
providerRpcTraceFilter=com.xxx.xxx.filter.ProviderRpcTraceFilter
除了Dubbo RPC 的這種方式,常見微服務之間的調用也有經過 HTTP REST 來完成調用的。這種場景下就須要在上游服務在發起HTTP調用的時候自動將 TraceID
添加到 HTTP Header 中。
以經常使用的 Spring RestTemplate 爲例,使用攔截器來包裝 HTTP Header。
RestTemplate restTemplate = new RestTemplate(); List<ClientHttpRequestInterceptor> list = new ArrayList<>(); list.add((request, body, execution) -> { String traceId = MDC.get(TRACE_ID); if (StringUtils.isNotEmpty(traceId)) { request.getHeaders().add(TRACE_ID, traceId); } return execution.execute(request, body); }); restTemplate.setInterceptors(list);
下游服務因爲是經過HTTP 接口暴露的服務,就添加一個攔截器來獲取就好。
public class TraceInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { MDC.clear(); String traceId = request.getHeader(TRACE_ID); if (StringUtils.isEmpty(traceId)) { ThreadMdcUtil.setTraceId(); } else { MDC.put(TRACE_ID, traceId); } return true; } }
通過上面的幾個步驟,咱們至關因而本身造成了一個比較基礎的服務追蹤的解決方案。
Spring Cloud 做爲一個一站式 微服務開發框架,提供了Spring Cloud Sleuth 做爲 該技術體系下分佈式跟蹤的解決方案。這裏我想拿出來說一講。
Sleuth 是一個成熟的技術解決方案,基於 Google Dapper 爲理論基礎實現,裏面的一些術語都來自於那篇論文。在對於TraceID
傳遞的問題上,咱們上面講的簡單版的解決方案的一些解決問題的思路,實際上在Sleuth 中也有體現。
首先就是分佈式追蹤,Sleuth 會將 SpanID
和 TraceID
添加到 Slf4J MDC 中,這樣在打印出來的日誌就會有帶上對應的標識。
在遇到線程池 TraceID 傳遞失效的問題時,咱們至關了對提交任務的操做進行包裝,而在Slueth 中,是經過實現HystrixConcurrencyStrategy
接口來解決 TraceID
異步傳遞的問題。Hystrix在實際調用時,會調用HystrixConcurrencyStrategy
的 wrapCallable
方法。經過實現這個接口,在wrapCallable
中將TraceID
存放起來(具體參見SleuthHystrixConcurrencyStrategy
)。
在面對Dubbo RPC 的調用方式和 Http Service 的調用方式中,咱們經過Dubbo RpcContext + Filter和 Http Header + Interceptor 的方式,經過協議或者框架自己提供的擴展點和上下文機制,來傳遞TraceID
。而在 Spring Cloud Sleuth中追蹤@Async
,RestTemplate
,Zuul
,Feign
等組件時,也是相似的解決思路。好比追蹤RestTemplate
就是和上文同樣借用了Spring Client的 Interceptor 機制 (@See TraceRestTemplateInterceptor
)。
上述就是將咱們的簡單解決方案和 Spring Cloud Sleuth 的對比,想說明日誌追蹤的思想和一些技術解決思路是共通相近的。
固然,Spring Cloud Sleuth 基於 Dapper 實現,提供了一個比較成熟的分佈式系統調用追蹤架構,集成ZipKin + spring-cloud-sleuth-zipkin 依賴以後,可以搭建一個完整的具備數據收集、數據存儲和數據展現功能的分佈式服務追蹤系統。
經過Sleuth能夠很清楚的瞭解到一個服務請求通過了哪些服務,每一個服務處理花費了多長。從而讓咱們能夠很方便的理清各微服務間的調用關係。此外Sleuth能夠幫助咱們:
PS:spring-cloud-sleth 2.0 中開始 正式支持 Dubbo,思路的話則是經過Dubbo filter 擴展機制。
再講講爲何不引入Sleuth + ZipKin 這種解決方案呢?由於咱們系統的調用鏈路並不複雜,通常只有一層的調用關係,因此並不但願增長第三方的組件,更傾向於使用簡單的解決方案。
本篇文章到這裏就結束了。實現一個簡單的微服務調用追蹤的日誌方案並無太大的難度,重要的仍是解決問題的思路,而且舉一反三,去學習一些市面上的已存在的優秀技術解決方案。
若是本文有幫助到你,但願能點個贊,這是對個人最大動力。