大衆點評Cat源碼閱讀(六)——MessageTree編碼、解碼字節流

1、MessageTree的數據結構

結構以下圖: 輸入圖片說明sql

2、客戶端將MessageTree編碼成字節流

3、服務端將字節流反編碼成MessageTree

2.1 功能介紹

服務端接收到ByteBuf以後,交給PlainTextMessage解碼,解碼分兩步走:session

  1. 先解析MessageTree的header,包含domain,hostname、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。這部份內容示例效果以下:

輸入圖片說明

  1. 解碼MessageTree中的Message,這部分是一個嵌套的Transaction。

2.2代碼分析

先看MesageTree的header部分功能解析。先看一個工具類BufferHelper,read方法代碼以下:數據結構

/**
		 * 從ctx的buffer中讀取字符串  到 separator 位置位置
		 * 如 read("abcde",'c')="ab"
		 * @param ctx
		 * @param separator
		 * @return
		 */
		public String read(Context ctx, byte separator) {
			ByteBuf buf = ctx.getBuffer();
			char[] data = ctx.getData();
			int from = buf.readerIndex();
			int to = buf.writerIndex();
			int index = 0;
			boolean flag = false;

			for (int i = from; i < to; i++) {
				byte b = buf.readByte();

				if (b == separator) {
					break;
				}
				//將所有的data
				if (index >= data.length) {
					char[] data2 = new char[to - from];

					System.arraycopy(data, 0, data2, 0, index);
					data = data2;
				}

				char c = (char) (b & 0xFF);

				if (c > 127) {
					flag = true;
				}

				if (c == '\\' && i + 1 < to) {
					byte b2 = buf.readByte();

					if (b2 == 't') {
						c = '\t';
						i++;
					} else if (b2 == 'r') {
						c = '\r';
						i++;
					} else if (b2 == 'n') {
						c = '\n';
						i++;
					} else if (b2 == '\\') {
						c = '\\';
						i++;
					} else {
						// move back
						buf.readerIndex(i + 1);
					}
				}

				data[index] = c;
				index++;
			}

			if (!flag) {
				return new String(data, 0, index);
			} else {
				byte[] ba = new byte[index];

				for (int i = 0; i < index; i++) {
					ba[i] = (byte) (data[i] & 0xFF);
				}

				try {
					return new String(ba, 0, index, "utf-8");
				} catch (UnsupportedEncodingException e) {
					return new String(ba, 0, index);
				}
			}
		}

解碼header部分就是調用上面的方法,逐步將context中的bytebuf裏面的header部分解析出來,並將readindex移動,後面直接解碼Message。dom

protected void decodeHeader(Context ctx, MessageTree tree) {
		BufferHelper helper = m_bufferHelper;
		String id = helper.read(ctx, TAB);
		String domain = helper.read(ctx, TAB);
		String hostName = helper.read(ctx, TAB);
		String ipAddress = helper.read(ctx, TAB);
		String threadGroupName = helper.read(ctx, TAB);
		String threadId = helper.read(ctx, TAB);
		String threadName = helper.read(ctx, TAB);
		String messageId = helper.read(ctx, TAB);
		String parentMessageId = helper.read(ctx, TAB);
		String rootMessageId = helper.read(ctx, TAB);
		String sessionToken = helper.read(ctx, LF);

		if (VERSION.equals(id)) {
			tree.setDomain(domain);
			tree.setHostName(hostName);
			tree.setIpAddress(ipAddress);
			tree.setThreadGroupName(threadGroupName);
			tree.setThreadId(threadId);
			tree.setThreadName(threadName);
			tree.setMessageId(messageId);
			tree.setParentMessageId(parentMessageId);
			tree.setRootMessageId(rootMessageId);
			tree.setSessionToken(sessionToken);
		} else {
			throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));
		}
	}

2.2 解碼Message

仍是以上面的MessageTree爲例,以下: 輸入圖片說明ide

解碼步驟以下:工具

  1. 新建stack,解析上圖第一行,將數據塞進新建的transaction1中,將null入stack,將transaction1返回,stack內的數據以下:編碼

  2. 將上一部返回的transaction1做爲parent和stack傳進decodeLine方法,解析第2行,新建一個event,將event做爲child掛到parent(transaction1)下,返回parent(transaction1)code

  3. 將上一步返回的parent(transaction1)作爲parent,連帶stack,一塊兒傳進decodeLine方法,解析第3行,將event做爲child掛到parent(transaction1)下,返回parent(transaction1)orm

  4. 相似的解析第四、5行圖片

  5. 將上一步返回的parent(transaction1)作爲parent,連帶stack,一塊兒傳進decodeLine方法,解析第6行,新建一個transaction2,將transaction2掛到parent(transaction1)下面的child中,將parent(transaction1)入棧,此時stack內有null,transaction1兩個元素。將transaction2返回出去,做爲下一步的parent,棧內數據以下:

  6. 相似上一步,將新建的transaction3掛到上一步產生的parent(transaction2)下,將parent(transaction2)入棧,此時stack內有null、transaction一、transaction2三個元素。接下來把transaction3返回出去,做爲下一步的parent。棧內元素以下:

  7. 第8行相似第2行

  8. 第9行,將解析的數據注入到parent(transaction3)中,這時候transaction3處理完結了。從棧頂拿到一個最近的元素(transaction2),做爲下一步處理的parent。棧內元素以下:

  9. 第10行,發現是t開頭的,又新建一個transaction4,將parent(transaction2)入棧,將transaction4做爲下一步的parent。棧內元素以下:

  10. 第11行,相似第2行處理,掛到parent(transaction4)下面,將parent(transaction4)做爲下一步的parent繼續處理。

  11. 第12行,將數據注入到parent(transaction4)中,完結這個parent(transaction4),將棧頂的transaction2做爲下一步的parent,繼續處理。

  12. 第13行,相似上一步,完結transaction2,將棧頂transaction1做爲下一步的parent,繼續處理。

  13. 第14行,相似上一步,完結transaction1,將棧頂的null做爲下一步的parent,繼續處理。此時棧內元素爲空。 發現沒有數據能夠處理了,此時,整個MessageTree解碼完成。處理每一行數據時,對應的棧內數據以下:

輸入圖片說明

解析MessageTree代碼以下:

//將原消息解析成一棵消息樹
	protected void decodeMessage(Context ctx, MessageTree tree) {
		Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
		Message parent = decodeLine(ctx, null, stack);

		tree.setMessage(parent);
		//逐行讀取,最終構建消息樹
		while (ctx.getBuffer().readableBytes() > 0) {
		    /***************  將bytebuf轉換成defaultTransaction *********************/
			Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);

			if (message instanceof DefaultTransaction) {
				parent = message;
			} else {
				break;
			}
		}
	}

逐行解析的decodeLine 代碼以下:

protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) {
		BufferHelper helper = m_bufferHelper;
		byte identifier = ctx.getBuffer().readByte();
		String timestamp = helper.read(ctx, TAB);
		String type = helper.read(ctx, TAB);
		String name = helper.read(ctx, TAB);

		switch (identifier) {
		case 't':
			/**
			 * 這種類型,纔會新建一個transaction,
			 *    若是這是第一個,則將這個做爲最頂級的parent,掛到MessageTree的message下,將parent(null)入棧
			 *    若是不是第一個,則將當前建立的transaction掛到parent(也是一個transaction)的下面,做爲一個child,同時將parent入棧,等待當前
			 *    作完這些事,將當前的transaction做爲下一次循環的parent,直到遇到'T',將當前這個transaction數據補充完整,再從棧清楚這個transaction,代表當前這個transaction完結
			 */
			DefaultTransaction transaction = new DefaultTransaction(type, name, null);

			helper.read(ctx, LF); // get rid of line feed
			transaction.setTimestamp(m_dateHelper.parse(timestamp));

			if (parent != null) {
				parent.addChild(transaction);
			}

			stack.push(parent);
			return transaction;
		case 'A':
			DefaultTransaction tran = new DefaultTransaction(type, name, null);
			String status = helper.read(ctx, TAB);
			String duration = helper.read(ctx, TAB);
			String data = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			tran.setTimestamp(m_dateHelper.parse(timestamp));
			tran.setStatus(status);
			tran.addData(data);

			long d = Long.parseLong(duration.substring(0, duration.length() - 2));
			tran.setDurationInMicros(d);

			if (parent != null) {
				parent.addChild(tran);
				return parent;//返回的是傳進來的parent,這裏parent是一個transaction,transaction把A類型的數據做爲child收集了
			} else {
				return tran;
			}
		case 'T':
			String transactionStatus = helper.read(ctx, TAB);
			String transactionDuration = helper.read(ctx, TAB);
			String transactionData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			parent.setStatus(transactionStatus);
			parent.addData(transactionData);

			long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2));

			parent.setDurationInMicros(transactionD);

			return stack.pop();//這裏不新建transaction,只是將數據插入到parent裏面,將stock頂的transaction返回,
		case 'E':
			DefaultEvent event = new DefaultEvent(type, name);
			String eventStatus = helper.read(ctx, TAB);
			String eventData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			event.setTimestamp(m_dateHelper.parse(timestamp));
			event.setStatus(eventStatus);
			event.addData(eventData);

			//sql類型特殊處理
			processSQLEvent(event);
			
			if (parent != null) {
				parent.addChild(event);
				return parent;
			} else {
				return event;
			}
		case 'M':
			DefaultMetric metric = new DefaultMetric(type, name);
			String metricStatus = helper.read(ctx, TAB);
			String metricData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			metric.setTimestamp(m_dateHelper.parse(timestamp));
			metric.setStatus(metricStatus);
			metric.addData(metricData);

			if (parent != null) {
				parent.addChild(metric);
				return parent;
			} else {
				return metric;
			}
		case 'L':
			DefaultTrace trace = new DefaultTrace(type, name);
			String traceStatus = helper.read(ctx, TAB);
			String traceData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			trace.setTimestamp(m_dateHelper.parse(timestamp));
			trace.setStatus(traceStatus);
			trace.addData(traceData);

			if (parent != null) {
				parent.addChild(trace);
				return parent;
			} else {
				return trace;
			}
		case 'H':
			DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
			String heartbeatStatus = helper.read(ctx, TAB);
			String heartbeatData = helper.read(ctx, TAB);

			helper.read(ctx, LF); // get rid of line feed
			heartbeat.setTimestamp(m_dateHelper.parse(timestamp));
			heartbeat.setStatus(heartbeatStatus);
			heartbeat.addData(heartbeatData);

			if (parent != null) {
				parent.addChild(heartbeat);
				return parent;
			} else {
				return heartbeat;
			}
		default:
			m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: "
			      + ctx.getBuffer().toString(Charset.forName("utf-8")));
			throw new RuntimeException("Unknown identifier int name");
		}
	}
相關文章
相關標籤/搜索