CAT具體的設計思想、實現原理在這我就不羅列了,本文主要是記錄一下在使用CAT的過程當中遇到的一些問題,好比分佈式logview,Cache、DB埋點監控等,問題很少,可是比較典型。java
我已經實現一個 CAT 監控相關的擴展實現,可供參考:mysql
https://github.com/summerpotato/cat-monitorgit
(本文涉及的CAT版本爲1.3.6)github
目前使用過兩種,一種是基於 dubbo 應用的 rpc 調用,一種是基於 http 請求的 rest 服務調用。首先說下 message tree 的實現,追蹤跨服務的消息時,經過根消息 id 和父級消息 id 及子消息 id 三個屬性進行消息串聯,組成消息樹。關鍵點在 tree 的三個 id 的得到和傳遞。redis
這裏有兩點,第一是 CAT 消息樹生成原理:spring
咱們須要實現 Cat 的 Context 上下文,而後經過 Cat.logRemoteCallClient(context) 生成包含節點數據的上下文對象(方法中經過建立消息樹對象來獲取各節點的消息 id,填充給上下文),當遠程服務端接收到這個 context 時,使用 Cat.logRemoteCallServer(context) 方法,讀取各節點消息 id,組建消息樹。sql
第二是消息應如何傳遞:數據庫
dubbo 應用的 rpc 調用方式:調用過程要傳遞的 rpc 上下文,其中包含調用信息、參數以及狀態信息等,能夠把消息 id 信息放到 RpcContext 中,而後經過調用 Invocation 對象的 invoke 方法,將消息傳遞至服務端。最後,經過dubbo的 spi 拓展機制,實現 com.alibaba.dubbo.rpc.Filter,用來獲取 rpcContext 的內容。緩存
rest 風格的 http 請求方式:調用時,在服務請求方把消息 id 信息放到 Http-Header 中,在服務提供方,用 filter 攔截,並得到 http-header 中的消息 id,這樣經過埋點,串聯起消息樹。restful
廢話很少說了,上碼吧。
public class DubboCatFilter implements Filter { private static final ThreadLocal<Cat.Context> CAT_CONTEXT = new ThreadLocal<Cat.Context>(); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String sideKey = url.getParameter(Constants.SIDE_KEY); String loggerName = invoker.getInterface().getSimpleName() + "." + invocation.getMethodName(); String type = "PigeonCall"; if (Constants.PROVIDER_SIDE.equals(sideKey)) { type = "PigeonService"; } Transaction t = Cat.newTransaction(type, loggerName); Result result = null; try { Cat.Context context = getContext(); if (Constants.CONSUMER_SIDE.equals(sideKey)) { createConsumerCross(url, t); Cat.logRemoteCallClient(context); } else { createProviderCross(url, t); Cat.logRemoteCallServer(context); } setAttachment(context); result = invoker.invoke(invocation); if (result.hasException()) { //給調用接口出現異常進行打點 Throwable throwable = result.getException(); Event event = null; if (RpcException.class == throwable.getClass()) { Throwable caseBy = throwable.getCause(); if (caseBy != null && caseBy.getClass() == TimeoutException.class) { event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName); } else { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); } } else if (RemotingException.class.isAssignableFrom(throwable.getClass())) { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); }else{ event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName); } event.setStatus(result.getException()); completeEvent(event); t.addChild(event); t.setStatus(result.getException().getClass().getSimpleName()); } else { t.setStatus(Message.SUCCESS); } return result; } catch (RuntimeException e) { Event event = null; if (RpcException.class == e.getClass()) { Throwable caseBy = e.getCause(); if (caseBy !=null && caseBy.getClass() == TimeoutException.class) { event = Cat.newEvent("DUBBO_TIMEOUT_ERROR", loggerName); } else { event = Cat.newEvent("DUBBO_REMOTING_ERROR", loggerName); } } else { event = Cat.newEvent("DUBBO_BIZ_ERROR", loggerName); } event.setStatus(e); completeEvent(event); t.addChild(event); t.setStatus(e.getClass().getSimpleName()); if (result == null) { throw e; } else { return result; } } finally { t.complete(); CAT_CONTEXT.remove(); } } static class DubboCatContext implements Cat.Context { private Map<String,String> properties = new HashMap<String, String>(); @Override public void addProperty(String key, String value) { properties.put(key,value); } @Override public String getProperty(String key) { return properties.get(key); } } private void setAttachment(Cat.Context context) { RpcContext.getContext().setAttachment(Cat.Context.ROOT,context.getProperty(Cat.Context.ROOT)); RpcContext.getContext().setAttachment(Cat.Context.CHILD,context.getProperty(Cat.Context.CHILD)); RpcContext.getContext().setAttachment(Cat.Context.PARENT,context.getProperty(Cat.Context.PARENT)); } private Cat.Context getContext(){ Cat.Context context = CAT_CONTEXT.get(); if (context==null) { context = initContext(); CAT_CONTEXT.set(context); } return context; } private Cat.Context initContext() { Cat.Context context = new DubboCatContext(); Map<String,String> attachments = RpcContext.getContext().getAttachments(); if (attachments!=null&&attachments.size()>0) { for (Map.Entry<String,String> entry:attachments.entrySet()) { if (Cat.Context.CHILD.equals(entry.getKey()) || Cat.Context.ROOT.equals(entry.getKey()) || Cat.Context.PARENT.equals(entry.getKey())) { context.addProperty(entry.getKey(),entry.getValue()); } } } return context; } private void createConsumerCross(URL url, Transaction t) { Event crossAppEvent = Cat.newEvent("PigeonCall.app", getProviderAppName(url)); Event crossServerEvent = Cat.newEvent("PigeonCall.server", url.getHost()); Event crossPortEvent = Cat.newEvent("PigeonCall.port", url.getPort() + ""); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); crossPortEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossPortEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossPortEvent); t.addChild(crossServerEvent); } private void createProviderCross(URL url, Transaction t) { String consumerAppName = RpcContext.getContext().getAttachment(Constants.APPLICATION_KEY); if (StringUtils.isEmpty(consumerAppName)) { consumerAppName = RpcContext.getContext().getRemoteHost() + ":" + RpcContext.getContext().getRemotePort(); } Event crossAppEvent = Cat.newEvent("PigeonService.app", consumerAppName); Event crossServerEvent = Cat.newEvent("PigeonService.client", url.getHost()); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossServerEvent); } private void completeEvent(Event event) { AbstractMessage message = (AbstractMessage) event; message.setCompleted(true); } }
CatHttpClientProxy.java
public void requestByGet(String url) { Transaction t = Cat.newTransaction("PigeonCall", "method000"); //建立默認的httpClient實例 CloseableHttpClient httpClient = HttpClients.createDefault(); RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(5000).setConnectionRequestTimeout(1000) .setSocketTimeout(5000).build(); try { HttpGet httpGet = new HttpGet(url); httpGet.setConfig(requestConfig); //串聯埋點 Cat.Context context = new CatHttpContext(); this.createConsumerCross(url, t); Cat.logRemoteCallClient(context); httpGet.setHeader(Cat.Context.ROOT, context.getProperty(Cat.Context.ROOT)); httpGet.setHeader(Cat.Context.PARENT, context.getProperty(Cat.Context.PARENT)); httpGet.setHeader(Cat.Context.CHILD, context.getProperty(Cat.Context.CHILD)); System.out.println("執行get請求:...." + httpGet.getURI()); CloseableHttpResponse httpResponse = null; //發送get請求 httpResponse = httpClient.execute(httpGet);//請求返回的Resp,含http的header和執行結果實體Entity try { //response實體 HttpEntity entity = httpResponse.getEntity();//不包含header if (null != entity) { System.out.println("響應狀態碼:"+ httpResponse.getStatusLine()); System.out.println("-------------------------------------------------"); System.out.println("響應內容:" + EntityUtils.toString(entity)); } } finally { httpResponse.close(); } t.setStatus(Transaction.SUCCESS); } catch (Exception e) { e.printStackTrace(); t.setStatus(e.getClass().getSimpleName()); } finally { t.complete(); try { closeHttpClient(httpClient); } catch (IOException e) { e.printStackTrace(); } } } private void createConsumerCross(String url, Transaction t){ Event crossAppEvent = Cat.newEvent("PigeonCall.app", "serverName"); Event crossServerEvent = Cat.newEvent("PigeonCall.server", "serverIp"); Event crossPortEvent = Cat.newEvent("PigeonCall.port", "serverPort"); crossAppEvent.setStatus(Event.SUCCESS); crossServerEvent.setStatus(Event.SUCCESS); crossPortEvent.setStatus(Event.SUCCESS); completeEvent(crossAppEvent); completeEvent(crossPortEvent); completeEvent(crossServerEvent); t.addChild(crossAppEvent); t.addChild(crossPortEvent); t.addChild(crossServerEvent); } private void completeEvent(Event event){ AbstractMessage message = (AbstractMessage) event; message.setCompleted(true); } private void closeHttpClient(CloseableHttpClient client) throws IOException{ if (client != null) { client.close(); } }
CAT源碼內部對於緩存的識別存在一個 convention 約定,是基於匹配 「Cache.」 字符串的,而且經過判斷字符串 「Cache.memcached」 來支持 memcached 監控,但是沒有對 redis 作顯示支持,須要修改源碼,增長判斷字符串 「Cache.redis」;
1).修改類:cat-home - com.dianping.cat.report.page.statistics.task.utilization.TransactionReportVisitor.java
增長對 redis 的判斷支持:
private static final String REDIS = "Cache.redis";
public TransactionReportVisitor() { m_types.add("URL"); m_types.add("Service"); m_types.add("PigeonService"); m_types.add("Call"); m_types.add("PigeonCall"); m_types.add("SQL"); m_types.add(MEMCACHED); m_types.add(REDIS); }
@Override public void visitType(TransactionType type) { String typeName = type.getId(); Domain domain = m_report.findOrCreateDomain(m_domain); if ("Service".equals(typeName)) { typeName = "PigeonService"; } else if ("Call".equals(typeName)) { typeName = "PigeonCall"; } else if (typeName.startsWith(MEMCACHED)) { typeName = MEMCACHED; } else if (typeName.startsWith(REDIS)){ typeName = REDIS; } ...... }
2).修改類:cat-core - com.dianping.cat.config.server.ServerConfigManager.java
增長對 redis 的判斷支持:
public boolean isCacheTransaction(String type) { return StringUtils.isNotEmpty(type) && (type.startsWith("Cache.memcached") || type.startsWith("Cache.redis")); }
3).修改類:cat-consumer - com.dianping.cat.consumer.storage.StorageAnalyzer.java
增長對redis的判斷支持:
private void processCacheTransaction(MessageTree tree, Transaction t) { String cachePrefix = "Cache."; String ip = "Default"; String domain = tree.getDomain(); String cacheType = t.getType().substring(cachePrefix.length()); String name = t.getName(); String method = name.substring(name.lastIndexOf(":") + 1); List<Message> messages = t.getChildren(); for (Message message : messages) { if (message instanceof Event) { String type = message.getType(); if (type.equals("Cache.memcached.server") || type.equals("Cache.redis.server")) { ip = message.getName(); int index = ip.indexOf(":"); if (index > -1) { ip = ip.substring(0, index); } } } } ...... }
若是你的 orm 框架使用的 mybatis,能夠考慮經過實現攔截器 Interceptor 來對DB進行底層監控,CAT對數據庫的埋點也存在 convention,這裏代碼中存在 hard code。具體埋點以下:
MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0]; //獲得類名,方法 String[] strArr = mappedStatement.getId().split("\\."); String methodName = strArr[strArr.length - 2] + "." + strArr[strArr.length - 1]; Transaction t = Cat.newTransaction("SQL", "methodName"); //獲取SQL類型 SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType(); Cat.logEvent("SQL.Method", sqlCommandType.name().toLowerCase()); String JDBC_CONNECTION = "jdbc:mysql://unknown:3306/%s?useUnicode=true"; Cat.logEvent("SQL.Database", String.format(JDBC_CONNECTION, serverIp, dbName));
spring配置以下:
<bean id="sessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="dataSource"/> <property name="configLocation" value="classpath:mybatis.xml"/> <!-- 插件配置 --> <property name="plugins"> <array> <bean class="com.kubbo.java.common.cat.CatMybatisPlugin"></bean> </array> </property> </bean>
以上僅羅列了每一個問題的一種實現方案,只是給正在研究CAT的同窗一個參考思路,我的研究CAT也是剛開始,所說之處難免存在一些紕漏,歡迎指正和交流。
歡迎轉載,但請務必註明來源!