Java 服務調用全流程追蹤 簡易實現方案

undefined

前言

前段時間,本人一直協助項目組在作系統的重構,系統應用被拆分紅了多個服務,部分服務作了集羣部署。隨着上述架構的演進,天然而然的引進了ELK + Filebeat 作日誌收集。可是在使用Kibana查看日誌時,因爲缺乏TraceID,致使開發人員很難篩選出指定請求的相關日誌,也很難追蹤應用對下游服務的調用過程,耗費了不少時間。我本身查過幾回問題以後,實在受不了每次要花這麼久的時間,就趕忙向主管提了這一次的改造。html

本篇文章主要是記錄本人對項目TraceID鏈路追蹤改造的解決方案的研究、遇到的問題和具體的實現,同時本次改造也加深了我本身對分佈式服務追蹤的一些理解,我也寫在了裏面。java

本文主要內容:web

  • 初步實現
  • 異步線程traceId丟失的問題
  • 面向 Dubbo RPC 鏈路追蹤
  • 面向 HTTP Service 鏈路追蹤
  • 思考 SpringCloud Sleuth 的實現
  • 小結

1、初步實現

大致的思路就是藉助slf4j的MDC功能 + Spring Interceptor,當外部請求進入時生成一個traceId放在MDC當中。spring

MDC

這裏簡單介紹一下MDC。apache

MDC(Mapped Diagnostic Context,映射調試上下文)是 log4j 和 logback 提供的一種方便在多線程條件下記錄日誌的功能。MDC 能夠當作是一個與當前線程綁定的Map,能夠往其中添加鍵值對。MDC 中包含的內容能夠被同一線程中執行的代碼所訪問。當前線程的子線程會繼承其父線程中的 MDC 的內容。當須要記錄日誌時,只須要從 MDC 中獲取所需的信息便可。MDC 的內容則由程序在適當的時候保存進去。對於一個 Web 應用來講,一般是在請求被處理的最開始保存這些數據。

簡單來講,MDC就是日誌框架提供的一個InheritableThreadLocal,項目代碼中能夠將鍵值對放入其中,在打印的時候從ThreadLocal中獲取到對應的值而後打印出來。詳細的原理本文就不贅述了。看下 log4j 和 logback 裏面的實現類就知道了。segmentfault

實現

  1. 自定義Spring 攔截器 TraceInterceptor
/**
 * @author Richard_yyf
 */
public class TraceInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 清空
        MDC.clear();

        ThreadMdcUtil.setTraceIdIfAbsent();

        //後續邏輯... ...
        return true;
    }
}
  1. 註冊 攔截器
/**
 * @author Richard_yyf
 */
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(traceInterceptor())
                .addPathPatterns("/**")
                .order(0);
    }

    @Bean
    public TraceInterceptor traceInterceptor() {
        return new TraceInterceptor();
    }

}

ThreadMdcUtil是我本身封裝的一個工具類,包裝了對 TraceId 的一些操做:多線程

public class ThreadMdcUtil {
    
    public static String createTraceId() {
        String uuid = UUID.randomUUID().toString();
        return DigestUtils.md5Hex(uuid).substring(8, 24);
    }

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, createTraceId());
        }
    }
    // 省略了一些方法在後面會展現出來
}

DigestUtils來自於第三方依賴:架構

<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>***</version>
</dependency>

TRACE_ID放在 Constant類中方便引用:app

public class Constant {
    ...
   public static final String TRACE_ID = "traceId";
    ...
}
  1. 在日誌配置文件中修改輸出格式,增長TraceID字段的打印

    取值方式:%X{traceid}框架

image.png

結果

經過上面的步驟以後,你的web應用接收到請求後打印的日誌就會帶上TraceId

image.png

2、趕上線程池 TraceID 丟失的問題

前面的方案只是簡單實現了咱們的最基礎的需求。可是若是你真的使用起來,會發現異步的任務線程是沒有獲取到TraceID的。

一個成熟的應用確定會用到不少的線程池。常見的有@Async異步調用的線程池,應用自身定義的一些線程池等等。

前面有稍微提到過,MDC是經過InheritableThreadLocal實現的,建立子線程時,會複製父線程的inheritableThreadLocals屬性。可是在線程池中,線程是複用的,而不是新建立的,因此MDC內容就沒法傳遞進去。

因此咱們就須要曲線救國,既然線程是複用的,那咱們理所固然的就能想到在任務提交至線程池的時候作一些「騷」操做,來說MDC的內容傳遞下去。

改造

這裏就直接放上代碼:

/**
 * @author Richard_yyf
 */
public class ThreadMdcUtil {
    
    public static String createTraceId() {
        String uuid = UUID.randomUUID().toString();
        return DigestUtils.md5Hex(uuid).substring(8, 24);
    }

    public static void setTraceIdIfAbsent() {
        if (MDC.get(TRACE_ID) == null) {
            MDC.put(TRACE_ID, createTraceId());
        }
    }

    public static void setTraceId() {
        MDC.put(TRACE_ID, createTraceId());
    }

    public static void setTraceId(String traceId) {
        MDC.put(TRACE_ID, traceId);
    }

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}

本身包裝擴展 ThreadPoolExecutor

/**
 * @author Richard_yyf
 */
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

使用

具體的使用就是把你原來executor = new ThreadPoolExecutor(...)改爲executor = new ThreadPoolExecutorMdcWrapper(...)便可。

好比你是用Spring @Async異步方法的,在配置線程池的時候就這樣聲明:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean("taskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolExecutorMdcWrapper();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(20);
            executor.setQueueCapacity(200);
            executor.setKeepAliveSeconds(60);
            return executor;
        }
    }

}

結果

按照上述步驟,你的異步任務在打印日誌的時候,就會帶上本來請求的TraceID了。

image.png

3、面向 Dubbo RPC 鏈路追蹤

咱們項目組主要使用Dubbo進行微服務框架的開發。咱們想在服務調用之間,傳遞上游服務的TraceID,來達到鏈路追蹤的效果。

Dubbo 提供了這樣的機制,能夠經過Dubbo RPC + Dubbo Filter 來設置和傳遞消費者的TraceID

詳見官網對於這兩個概念的說明。

Dubbo RPC
Dubbo Filter

這邊我直接給出代碼和擴展點配置。

Dubbo Filter for Consumer

消費者應用端:

/**
 * @author Richard_yyf
 */
@Activate(group = {Constants.CONSUMER})
public class ConsumerRpcTraceFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //若是MDC上下文有追蹤ID,則原樣傳遞給provider端
        String traceId = MDC.get(TRACE_ID);
        if (StringUtils.isNotEmpty(traceId)) {
            RpcContext.getContext().setAttachment(TRACE_ID, traceId);
        }
        return invoker.invoke(invocation);
    }

}

SPI 配置

resources目錄下,建立/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件.

consumerRpcTraceFilter=com.xxx.xxx.filter.ConsumerRpcTraceFilter

image.png

Dubbo Filter for Provider

服務提供者應用端:

/**
 * @author Richard_yyf
 */
@Activate(group = {Constants.PROVIDER})
public class ProviderRpcTraceFilter implements Filter {
    
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 接收消費端的traceId
        String traceId = RpcContext.getContext().getAttachment(TRACE_ID);
        if (StringUtils.isBlank(traceId)) {
            traceId = ThreadMdcUtil.createTraceId();
        }

        // 設置日誌traceId
        ThreadMdcUtil.setTraceId(traceId);

        // TODO 若是這個服務還會調用下一個服務,須要再次設置下游參數
        // RpcContext.getContext().setAttachment("trace_id", traceId);

        try {
            return invoker.invoke(invocation);
        } finally {
            // 調用完成後移除MDC屬性
            MDC.remove(TRACE_ID);
        }
    }

}

SPI 配置:

providerRpcTraceFilter=com.xxx.xxx.filter.ProviderRpcTraceFilter

4、面向 HTTP Service 鏈路追蹤

除了Dubbo RPC 的這種方式,常見微服務之間的調用也有經過 HTTP REST 來完成調用的。這種場景下就須要在上游服務在發起HTTP調用的時候自動將 TraceID添加到 HTTP Header 中。

以經常使用的 Spring RestTemplate 爲例,使用攔截器來包裝 HTTP Header。

RestTemplate restTemplate = new RestTemplate();

        List<ClientHttpRequestInterceptor> list = new ArrayList<>();
        list.add((request, body, execution) -> {
            String traceId = MDC.get(TRACE_ID);
            if (StringUtils.isNotEmpty(traceId)) {
                request.getHeaders().add(TRACE_ID, traceId);
            }
            return execution.execute(request, body);
        });

        restTemplate.setInterceptors(list);

下游服務因爲是經過HTTP 接口暴露的服務,就添加一個攔截器來獲取就好。

public class TraceInterceptor extends HandlerInterceptorAdapter {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        MDC.clear();
        String traceId = request.getHeader(TRACE_ID);
        if (StringUtils.isEmpty(traceId)) {
            ThreadMdcUtil.setTraceId();
        } else {
            MDC.put(TRACE_ID, traceId);
        }
        return true;
    }
}

5、思考 Spring Cloud Sleuth 的實現

通過上面的幾個步驟,咱們至關因而本身造成了一個比較基礎的服務追蹤的解決方案。

Spring Cloud 做爲一個一站式 微服務開發框架,提供了Spring Cloud Sleuth 做爲 該技術體系下分佈式跟蹤的解決方案。這裏我想拿出來說一講。

Sleuth 是一個成熟的技術解決方案,基於 Google Dapper 爲理論基礎實現,裏面的一些術語都來自於那篇論文。在對於TraceID傳遞的問題上,咱們上面講的簡單版的解決方案的一些解決問題的思路,實際上在Sleuth 中也有體現。

首先就是分佈式追蹤,Sleuth 會將 SpanIDTraceID添加到 Slf4J MDC 中,這樣在打印出來的日誌就會有帶上對應的標識。

在遇到線程池 TraceID 傳遞失效的問題時,咱們至關了對提交任務的操做進行包裝,而在Slueth 中,是經過實現HystrixConcurrencyStrategy接口來解決 TraceID異步傳遞的問題。Hystrix在實際調用時,會調用HystrixConcurrencyStrategywrapCallable 方法。經過實現這個接口,在wrapCallable 中將TraceID存放起來(具體參見SleuthHystrixConcurrencyStrategy)。

在面對Dubbo RPC 的調用方式和 Http Service 的調用方式中,咱們經過Dubbo RpcContext + Filter和 Http Header + Interceptor 的方式,經過協議或者框架自己提供的擴展點和上下文機制,來傳遞TraceID。而在 Spring Cloud Sleuth中追蹤@Async,RestTemplate,Zuul,Feign等組件時,也是相似的解決思路。好比追蹤RestTemplate就是和上文同樣借用了Spring Client的 Interceptor 機制 (@See TraceRestTemplateInterceptor)。

上述就是將咱們的簡單解決方案和 Spring Cloud Sleuth 的對比,想說明日誌追蹤的思想和一些技術解決思路是共通相近的。

固然,Spring Cloud Sleuth 基於 Dapper 實現,提供了一個比較成熟的分佈式系統調用追蹤架構,集成ZipKin + spring-cloud-sleuth-zipkin 依賴以後,可以搭建一個完整的具備數據收集、數據存儲和數據展現功能的分佈式服務追蹤系統。

經過Sleuth能夠很清楚的瞭解到一個服務請求通過了哪些服務,每一個服務處理花費了多長。從而讓咱們能夠很方便的理清各微服務間的調用關係。此外Sleuth能夠幫助咱們:

  • 耗時分析: 經過Sleuth能夠很方便的瞭解到每一個採樣請求的耗時,從而分析出哪些服務調用比較耗時;
  • 可視化錯誤: 對於程序未捕捉的異常,能夠經過集成Zipkin服務界面上看到;
  • 鏈路優化: 對於調用比較頻繁的服務,能夠針對這些服務實施一些優化措施。
PS:spring-cloud-sleth 2.0 中開始 正式支持 Dubbo,思路的話則是經過Dubbo filter 擴展機制。

小結

再講講爲何不引入Sleuth + ZipKin 這種解決方案呢?由於咱們系統的調用鏈路並不複雜,通常只有一層的調用關係,因此並不但願增長第三方的組件,更傾向於使用簡單的解決方案。

本篇文章到這裏就結束了。實現一個簡單的微服務調用追蹤的日誌方案並無太大的難度,重要的仍是解決問題的思路,而且舉一反三,去學習一些市面上的已存在的優秀技術解決方案。

若是本文有幫助到你,但願能點個贊,這是對個人最大動力。
相關文章
相關標籤/搜索