CAT 使用小結

CAT具體的設計思想、實現原理在這我就不羅列了,本文主要是記錄一下在使用CAT的過程當中遇到的一些問題,好比分佈式logview,Cache、DB埋點監控等,問題很少,可是比較典型。java

我已經實現一個 CAT 監控相關的擴展實現,可供參考:mysql

https://github.com/summerpotato/cat-monitorgit

(本文涉及的CAT版本爲1.3.6)github

1、分佈式 logview 的日誌樹串聯實現

目前使用過兩種,一種是基於 dubbo 應用的 rpc 調用,一種是基於 http 請求的 rest 服務調用。首先說下 message tree 的實現,追蹤跨服務的消息時,經過根消息 id 和父級消息 id 及子消息 id 三個屬性進行消息串聯,組成消息樹。關鍵點在 tree 的三個 id 的得到和傳遞。redis

這裏有兩點,第一是 CAT 消息樹生成原理spring

咱們須要實現 Cat 的 Context 上下文,而後經過 Cat.logRemoteCallClient(context) 生成包含節點數據的上下文對象(方法中經過建立消息樹對象來獲取各節點的消息 id,填充給上下文),當遠程服務端接收到這個 context 時,使用 Cat.logRemoteCallServer(context) 方法,讀取各節點消息 id,組建消息樹。sql

第二是消息應如何傳遞數據庫

dubbo 應用的 rpc 調用方式:調用過程要傳遞的 rpc 上下文,其中包含調用信息、參數以及狀態信息等,能夠把消息 id 信息放到 RpcContext 中,而後經過調用 Invocation 對象的 invoke 方法,將消息傳遞至服務端。最後,經過dubbo的 spi 拓展機制,實現 com.alibaba.dubbo.rpc.Filter,用來獲取 rpcContext 的內容。緩存

rest 風格的 http 請求方式:調用時,在服務請求方把消息 id 信息放到 Http-Header 中,在服務提供方,用 filter 攔截,並得到 http-header 中的消息 id,這樣經過埋點,串聯起消息樹。restful

廢話很少說了,上碼吧。

1).dubbo 調用方式部分實現(首先要清楚 dubbo 的 spi 相關配置,CAT監控的配置等)

public class DubboCatFilter implements Filter {

    private static final ThreadLocal<Cat.Context> CAT_CONTEXT = new ThreadLocal<Cat.Context>();

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String sideKey = url.getParameter(Constants.SIDE_KEY);
        String loggerName = invoker.getInterface().getSimpleName() + "." + invocation.getMethodName();
        String type = "PigeonCall";
        if (Constants.PROVIDER_SIDE.equals(sideKey)) {
            type = "PigeonService";
        }
        Transaction t = Cat.newTransaction(type, loggerName);
        Result result = null;
        try {
            Cat.Context context = getContext();
            if (Constants.CONSUMER_SIDE.equals(sideKey)) {
                createConsumerCross(url, t);
                Cat.logRemoteCallClient(context);
            } else {
                createProviderCross(url, t);
                Cat.logRemoteCallServer(context);
            }
            setAttachment(context);
            result = invoker.invoke(invocation);

            if (result.hasException()) {
                //給調用接口出現異常進行打點
                Throwable throwable = result.getException();
                Event event = null;
                if (RpcException.class == throwable.getClass()) {
                    Throwable caseBy = throwable.getCause();
                    if (caseBy != null && caseBy.getClass() == TimeoutException.class) {
                        event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName);
                    } else {
                        event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName);
                    }
                } else if (RemotingException.class.isAssignableFrom(throwable.getClass())) {
                    event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName);
                }else{
                    event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName);
                }
                event.setStatus(result.getException());
                completeEvent(event);
                t.addChild(event);
                t.setStatus(result.getException().getClass().getSimpleName());
            } else {
                t.setStatus(Message.SUCCESS);
            }
            return result;
        } catch (RuntimeException e) {
            Event event = null;
            if (RpcException.class == e.getClass()) {
                Throwable caseBy = e.getCause();
                if (caseBy !=null && caseBy.getClass() == TimeoutException.class) {
                    event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName);
                } else {
                    event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName);
                }
            } else {
                event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName);
            }
            event.setStatus(e);
            completeEvent(event);
            t.addChild(event);
            t.setStatus(e.getClass().getSimpleName());
            if (result == null) {
                throw e;
            } else {
                return result;
            }
        } finally {
            t.complete();
            CAT_CONTEXT.remove();
        }
    }

    static class DubboCatContext implements Cat.Context {
        private Map<String,String> properties = new HashMap<String, String>();

        @Override
        public void addProperty(String key, String value) {
            properties.put(key,value);
        }

        @Override
        public String getProperty(String key) {
            return properties.get(key);
        }
    }

    private void setAttachment(Cat.Context context) {
        RpcContext.getContext().setAttachment(Cat.Context.ROOT,context.getProperty(Cat.Context.ROOT));
        RpcContext.getContext().setAttachment(Cat.Context.CHILD,context.getProperty(Cat.Context.CHILD));
        RpcContext.getContext().setAttachment(Cat.Context.PARENT,context.getProperty(Cat.Context.PARENT));
    }

    private Cat.Context getContext(){
        Cat.Context context = CAT_CONTEXT.get();
        if (context==null) {
            context = initContext();
            CAT_CONTEXT.set(context);
        }
        return context;
    }

    private Cat.Context initContext() {
        Cat.Context context = new DubboCatContext();
        Map<String,String> attachments = RpcContext.getContext().getAttachments();
        if (attachments!=null&&attachments.size()>0) {
            for (Map.Entry<String,String> entry:attachments.entrySet()) {
                if (Cat.Context.CHILD.equals(entry.getKey()) || Cat.Context.ROOT.equals(entry.getKey()) || Cat.Context.PARENT.equals(entry.getKey())) {
                    context.addProperty(entry.getKey(),entry.getValue());
                }
            }
        }
        return context;
    }

    private void createConsumerCross(URL url, Transaction t) {
        Event crossAppEvent = Cat.newEvent("PigeonCall.app", getProviderAppName(url));
        Event crossServerEvent = Cat.newEvent("PigeonCall.server", url.getHost());
        Event crossPortEvent = Cat.newEvent("PigeonCall.port", url.getPort() + "");
        crossAppEvent.setStatus(Event.SUCCESS);
        crossServerEvent.setStatus(Event.SUCCESS);
        crossPortEvent.setStatus(Event.SUCCESS);
        completeEvent(crossAppEvent);
        completeEvent(crossPortEvent);
        completeEvent(crossServerEvent);
        t.addChild(crossAppEvent);
        t.addChild(crossPortEvent);
        t.addChild(crossServerEvent);
    }

    private void createProviderCross(URL url, Transaction t) {
        String consumerAppName = RpcContext.getContext().getAttachment(Constants.APPLICATION_KEY);
        if (StringUtils.isEmpty(consumerAppName)) {
            consumerAppName = RpcContext.getContext().getRemoteHost() + ":" + RpcContext.getContext().getRemotePort();
        }
        Event crossAppEvent = Cat.newEvent("PigeonService.app", consumerAppName);
        Event crossServerEvent = Cat.newEvent("PigeonService.client", url.getHost());
        crossAppEvent.setStatus(Event.SUCCESS);
        crossServerEvent.setStatus(Event.SUCCESS);
        completeEvent(crossAppEvent);
        completeEvent(crossServerEvent);
        t.addChild(crossAppEvent);
        t.addChild(crossServerEvent);
    }

    private void completeEvent(Event event) {
        AbstractMessage message = (AbstractMessage) event;
        message.setCompleted(true);
    }

}

2).http-restful 調用方式部分實現

CatHttpClientProxy.java

public void requestByGet(String url) {
    Transaction t = Cat.newTransaction("PigeonCall", "method000");
    
    //建立默認的httpClient實例
    CloseableHttpClient httpClient = HttpClients.createDefault();
    RequestConfig requestConfig = RequestConfig.custom()  
                .setConnectTimeout(5000).setConnectionRequestTimeout(1000)  
                .setSocketTimeout(5000).build();
    try {
        HttpGet httpGet = new HttpGet(url);
        httpGet.setConfig(requestConfig);
            
        //串聯埋點
        Cat.Context context = new CatHttpContext();
        this.createConsumerCross(url, t);
        Cat.logRemoteCallClient(context);
        httpGet.setHeader(Cat.Context.ROOT, context.getProperty(Cat.Context.ROOT));
        httpGet.setHeader(Cat.Context.PARENT, context.getProperty(Cat.Context.PARENT));
        httpGet.setHeader(Cat.Context.CHILD, context.getProperty(Cat.Context.CHILD));
            
        System.out.println("執行get請求:...." + httpGet.getURI());
        CloseableHttpResponse httpResponse = null;
        //發送get請求
        httpResponse = httpClient.execute(httpGet);//請求返回的Resp,含http的header和執行結果實體Entity
        try {
            //response實體
            HttpEntity entity = httpResponse.getEntity();//不包含header
            if (null != entity) {
                System.out.println("響應狀態碼:"+ httpResponse.getStatusLine());
                System.out.println("-------------------------------------------------");
                System.out.println("響應內容:" + EntityUtils.toString(entity));
            }
        } finally {
            httpResponse.close();
        }
        t.setStatus(Transaction.SUCCESS);
    } catch (Exception e) {
        e.printStackTrace();
        t.setStatus(e.getClass().getSimpleName());
    } finally {
        t.complete();
        try {
            closeHttpClient(httpClient);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

private void createConsumerCross(String url, Transaction t){
    Event crossAppEvent = Cat.newEvent("PigeonCall.app", "serverName");
    Event crossServerEvent = Cat.newEvent("PigeonCall.server", "serverIp");
    Event crossPortEvent = Cat.newEvent("PigeonCall.port", "serverPort");
    crossAppEvent.setStatus(Event.SUCCESS);
    crossServerEvent.setStatus(Event.SUCCESS);
    crossPortEvent.setStatus(Event.SUCCESS);
    completeEvent(crossAppEvent);
    completeEvent(crossPortEvent);
    completeEvent(crossServerEvent);
    t.addChild(crossAppEvent);
    t.addChild(crossPortEvent);
    t.addChild(crossServerEvent);
}

private void completeEvent(Event event){
    AbstractMessage message = (AbstractMessage) event;
    message.setCompleted(true);
}

private void closeHttpClient(CloseableHttpClient client) throws IOException{
    if (client != null) {
        client.close();
    }
}

二、CAT對 redis 緩存進行詳細監控

CAT源碼內部對於緩存的識別存在一個 convention 約定,是基於匹配 「Cache.」 字符串的,而且經過判斷字符串 「Cache.memcached」 來支持 memcached 監控,但是沒有對 redis 作顯示支持,須要修改源碼,增長判斷字符串 「Cache.redis」;

1).修改類:cat-home - com.dianping.cat.report.page.statistics.task.utilization.TransactionReportVisitor.java

增長對 redis 的判斷支持:

private static final String REDIS = "Cache.redis";
public TransactionReportVisitor() {
	m_types.add("URL");
	m_types.add("Service");
	m_types.add("PigeonService");
	m_types.add("Call");
	m_types.add("PigeonCall");
	m_types.add("SQL");
	m_types.add(MEMCACHED);
	m_types.add(REDIS);
}
@Override
public void visitType(TransactionType type) {
	String typeName = type.getId();
	Domain domain = m_report.findOrCreateDomain(m_domain);
  
	if ("Service".equals(typeName)) {
		typeName = "PigeonService";
	} else if ("Call".equals(typeName)) {
		typeName = "PigeonCall";
	} else if (typeName.startsWith(MEMCACHED)) {
		typeName = MEMCACHED;
	} else if (typeName.startsWith(REDIS)){
		typeName = REDIS;
	}
	......
}

2).修改類:cat-core - com.dianping.cat.config.server.ServerConfigManager.java

增長對 redis 的判斷支持:

public boolean isCacheTransaction(String type) {
	return StringUtils.isNotEmpty(type) && (type.startsWith("Cache.memcached") || type.startsWith("Cache.redis"));
}

3).修改類:cat-consumer - com.dianping.cat.consumer.storage.StorageAnalyzer.java

增長對redis的判斷支持:

private void processCacheTransaction(MessageTree tree, Transaction t) {
	String cachePrefix = "Cache.";
	String ip = "Default";
	String domain = tree.getDomain();
	String cacheType = t.getType().substring(cachePrefix.length());
	String name = t.getName();
	String method = name.substring(name.lastIndexOf(":") + 1);
	List<Message> messages = t.getChildren();

	for (Message message : messages) {
		if (message instanceof Event) {
			String type = message.getType();

			if (type.equals("Cache.memcached.server") || type.equals("Cache.redis.server")) {
				ip = message.getName();
				int index = ip.indexOf(":");
				if (index > -1) {
					ip = ip.substring(0, index);
				}
			}
		}
	}
	......
}

三、CAT 對 DB 數據庫進行詳細監控

若是你的 orm 框架使用的 mybatis,能夠考慮經過實現攔截器 Interceptor 來對DB進行底層監控,CAT對數據庫的埋點也存在 convention,這裏代碼中存在 hard code。具體埋點以下:

MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];
//獲得類名,方法
String[] strArr = mappedStatement.getId().split("\\.");
String methodName = strArr[strArr.length - 2] + "." + strArr[strArr.length - 1];
Transaction t = Cat.newTransaction("SQL", "methodName");

//獲取SQL類型
SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType();
Cat.logEvent("SQL.Method", sqlCommandType.name().toLowerCase());

String JDBC_CONNECTION = "jdbc:mysql://unknown:3306/%s?useUnicode=true";
Cat.logEvent("SQL.Database", String.format(JDBC_CONNECTION, serverIp, dbName));

spring配置以下:

<bean id="sessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <property name="dataSource" ref="dataSource"/>
    <property name="configLocation" value="classpath:mybatis.xml"/>
    <!-- 插件配置 -->
    <property name="plugins">
        <array>
            <bean class="com.kubbo.java.common.cat.CatMybatisPlugin"></bean>
        </array>
    </property>
</bean>

以上僅羅列了每一個問題的一種實現方案,只是給正在研究CAT的同窗一個參考思路,我的研究CAT也是剛開始,所說之處難免存在一些紕漏,歡迎指正和交流。

歡迎轉載,但請務必註明來源!

相關文章
相關標籤/搜索