結構以下圖: sql
服務端接收到ByteBuf以後,交給PlainTextMessage解碼,解碼分兩步走:session
先看MesageTree的header部分功能解析。先看一個工具類BufferHelper,read方法代碼以下:數據結構
/** * 從ctx的buffer中讀取字符串 到 separator 位置位置 * 如 read("abcde",'c')="ab" * @param ctx * @param separator * @return */ public String read(Context ctx, byte separator) { ByteBuf buf = ctx.getBuffer(); char[] data = ctx.getData(); int from = buf.readerIndex(); int to = buf.writerIndex(); int index = 0; boolean flag = false; for (int i = from; i < to; i++) { byte b = buf.readByte(); if (b == separator) { break; } //將所有的data if (index >= data.length) { char[] data2 = new char[to - from]; System.arraycopy(data, 0, data2, 0, index); data = data2; } char c = (char) (b & 0xFF); if (c > 127) { flag = true; } if (c == '\\' && i + 1 < to) { byte b2 = buf.readByte(); if (b2 == 't') { c = '\t'; i++; } else if (b2 == 'r') { c = '\r'; i++; } else if (b2 == 'n') { c = '\n'; i++; } else if (b2 == '\\') { c = '\\'; i++; } else { // move back buf.readerIndex(i + 1); } } data[index] = c; index++; } if (!flag) { return new String(data, 0, index); } else { byte[] ba = new byte[index]; for (int i = 0; i < index; i++) { ba[i] = (byte) (data[i] & 0xFF); } try { return new String(ba, 0, index, "utf-8"); } catch (UnsupportedEncodingException e) { return new String(ba, 0, index); } } }
解碼header部分就是調用上面的方法,逐步將context中的bytebuf裏面的header部分解析出來,並將readindex移動,後面直接解碼Message。dom
protected void decodeHeader(Context ctx, MessageTree tree) { BufferHelper helper = m_bufferHelper; String id = helper.read(ctx, TAB); String domain = helper.read(ctx, TAB); String hostName = helper.read(ctx, TAB); String ipAddress = helper.read(ctx, TAB); String threadGroupName = helper.read(ctx, TAB); String threadId = helper.read(ctx, TAB); String threadName = helper.read(ctx, TAB); String messageId = helper.read(ctx, TAB); String parentMessageId = helper.read(ctx, TAB); String rootMessageId = helper.read(ctx, TAB); String sessionToken = helper.read(ctx, LF); if (VERSION.equals(id)) { tree.setDomain(domain); tree.setHostName(hostName); tree.setIpAddress(ipAddress); tree.setThreadGroupName(threadGroupName); tree.setThreadId(threadId); tree.setThreadName(threadName); tree.setMessageId(messageId); tree.setParentMessageId(parentMessageId); tree.setRootMessageId(rootMessageId); tree.setSessionToken(sessionToken); } else { throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id)); } }
仍是以上面的MessageTree爲例,以下: ide
解碼步驟以下:工具
新建stack,解析上圖第一行,將數據塞進新建的transaction1中,將null入stack,將transaction1返回,stack內的數據以下:編碼
將上一部返回的transaction1做爲parent和stack傳進decodeLine方法,解析第2行,新建一個event,將event做爲child掛到parent(transaction1)下,返回parent(transaction1)code
將上一步返回的parent(transaction1)作爲parent,連帶stack,一塊兒傳進decodeLine方法,解析第3行,將event做爲child掛到parent(transaction1)下,返回parent(transaction1)orm
相似的解析第四、5行圖片
將上一步返回的parent(transaction1)作爲parent,連帶stack,一塊兒傳進decodeLine方法,解析第6行,新建一個transaction2,將transaction2掛到parent(transaction1)下面的child中,將parent(transaction1)入棧,此時stack內有null,transaction1兩個元素。將transaction2返回出去,做爲下一步的parent,棧內數據以下:
相似上一步,將新建的transaction3掛到上一步產生的parent(transaction2)下,將parent(transaction2)入棧,此時stack內有null、transaction一、transaction2三個元素。接下來把transaction3返回出去,做爲下一步的parent。棧內元素以下:
第8行相似第2行
第9行,將解析的數據注入到parent(transaction3)中,這時候transaction3處理完結了。從棧頂拿到一個最近的元素(transaction2),做爲下一步處理的parent。棧內元素以下:
第10行,發現是t開頭的,又新建一個transaction4,將parent(transaction2)入棧,將transaction4做爲下一步的parent。棧內元素以下:
第11行,相似第2行處理,掛到parent(transaction4)下面,將parent(transaction4)做爲下一步的parent繼續處理。
第12行,將數據注入到parent(transaction4)中,完結這個parent(transaction4),將棧頂的transaction2做爲下一步的parent,繼續處理。
第13行,相似上一步,完結transaction2,將棧頂transaction1做爲下一步的parent,繼續處理。
第14行,相似上一步,完結transaction1,將棧頂的null做爲下一步的parent,繼續處理。此時棧內元素爲空。 發現沒有數據能夠處理了,此時,整個MessageTree解碼完成。處理每一行數據時,對應的棧內數據以下:
解析MessageTree代碼以下:
//將原消息解析成一棵消息樹 protected void decodeMessage(Context ctx, MessageTree tree) { Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>(); Message parent = decodeLine(ctx, null, stack); tree.setMessage(parent); //逐行讀取,最終構建消息樹 while (ctx.getBuffer().readableBytes() > 0) { /*************** 將bytebuf轉換成defaultTransaction *********************/ Message message = decodeLine(ctx, (DefaultTransaction) parent, stack); if (message instanceof DefaultTransaction) { parent = message; } else { break; } } }
逐行解析的decodeLine 代碼以下:
protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) { BufferHelper helper = m_bufferHelper; byte identifier = ctx.getBuffer().readByte(); String timestamp = helper.read(ctx, TAB); String type = helper.read(ctx, TAB); String name = helper.read(ctx, TAB); switch (identifier) { case 't': /** * 這種類型,纔會新建一個transaction, * 若是這是第一個,則將這個做爲最頂級的parent,掛到MessageTree的message下,將parent(null)入棧 * 若是不是第一個,則將當前建立的transaction掛到parent(也是一個transaction)的下面,做爲一個child,同時將parent入棧,等待當前 * 作完這些事,將當前的transaction做爲下一次循環的parent,直到遇到'T',將當前這個transaction數據補充完整,再從棧清楚這個transaction,代表當前這個transaction完結 */ DefaultTransaction transaction = new DefaultTransaction(type, name, null); helper.read(ctx, LF); // get rid of line feed transaction.setTimestamp(m_dateHelper.parse(timestamp)); if (parent != null) { parent.addChild(transaction); } stack.push(parent); return transaction; case 'A': DefaultTransaction tran = new DefaultTransaction(type, name, null); String status = helper.read(ctx, TAB); String duration = helper.read(ctx, TAB); String data = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed tran.setTimestamp(m_dateHelper.parse(timestamp)); tran.setStatus(status); tran.addData(data); long d = Long.parseLong(duration.substring(0, duration.length() - 2)); tran.setDurationInMicros(d); if (parent != null) { parent.addChild(tran); return parent;//返回的是傳進來的parent,這裏parent是一個transaction,transaction把A類型的數據做爲child收集了 } else { return tran; } case 'T': String transactionStatus = helper.read(ctx, TAB); String transactionDuration = helper.read(ctx, TAB); String transactionData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed parent.setStatus(transactionStatus); parent.addData(transactionData); long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2)); parent.setDurationInMicros(transactionD); return stack.pop();//這裏不新建transaction,只是將數據插入到parent裏面,將stock頂的transaction返回, case 'E': DefaultEvent event = new DefaultEvent(type, name); String eventStatus = helper.read(ctx, TAB); String eventData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed event.setTimestamp(m_dateHelper.parse(timestamp)); event.setStatus(eventStatus); event.addData(eventData); //sql類型特殊處理 processSQLEvent(event); if (parent != null) { parent.addChild(event); return parent; } else { return event; } case 'M': DefaultMetric metric = new DefaultMetric(type, name); String metricStatus = helper.read(ctx, TAB); String metricData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed metric.setTimestamp(m_dateHelper.parse(timestamp)); metric.setStatus(metricStatus); metric.addData(metricData); if (parent != null) { parent.addChild(metric); return parent; } else { return metric; } case 'L': DefaultTrace trace = new DefaultTrace(type, name); String traceStatus = helper.read(ctx, TAB); String traceData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed trace.setTimestamp(m_dateHelper.parse(timestamp)); trace.setStatus(traceStatus); trace.addData(traceData); if (parent != null) { parent.addChild(trace); return parent; } else { return trace; } case 'H': DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name); String heartbeatStatus = helper.read(ctx, TAB); String heartbeatData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed heartbeat.setTimestamp(m_dateHelper.parse(timestamp)); heartbeat.setStatus(heartbeatStatus); heartbeat.addData(heartbeatData); if (parent != null) { parent.addChild(heartbeat); return parent; } else { return heartbeat; } default: m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: " + ctx.getBuffer().toString(Charset.forName("utf-8"))); throw new RuntimeException("Unknown identifier int name"); } }