目標:介紹telnet的相關實現邏輯、介紹dubbo-remoting-api中的telnet包內的源碼解析。
從dubbo 2.0.5開始,dubbo開始支持經過 telnet 命令來進行服務治理。本文就是講解一些公用的telnet命令的實現。下面來看一下telnet實現的類圖:java
能夠看到,實現了TelnetHandler接口的有六個類,除了TelnetHandlerAdapter是之外,其餘五個分別對應了clear、exit、help、log、status命令的實現,具體用來幹嗎,請看官方文檔的介紹。git
@SPI public interface TelnetHandler { /** * telnet. * 處理對應的telnet命令 * @param channel * @param message telnet命令 */ String telnet(Channel channel, String message) throws RemotingException; }
該接口上telnet命令處理器接口,是一個可擴展接口。它定義了一個方法,就是處理相關的telnet命令。github
該類繼承了ChannelHandlerAdapter,實現了TelnetHandler接口,是TelnetHandler的適配器類,負責在接收到HeaderExchangeHandler發來的telnet命令後分發給對應的TelnetHandler實現類去實現,而且返回命令結果。api
public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler { /** * 擴展加載器 */ private final ExtensionLoader<TelnetHandler> extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) throws RemotingException { // 得到提示鍵配置,用於nc獲取信息時不顯示提示符 String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT); boolean noprompt = message.contains("--no-prompt"); message = message.replace("--no-prompt", ""); StringBuilder buf = new StringBuilder(); // 刪除頭尾空白符的字符串 message = message.trim(); String command; // 得到命令 if (message.length() > 0) { int i = message.indexOf(' '); if (i > 0) { // 得到命令 command = message.substring(0, i).trim(); // 得到參數 message = message.substring(i + 1).trim(); } else { command = message; message = ""; } } else { command = ""; } if (command.length() > 0) { // 若是有該命令的擴展實現類 if (extensionLoader.hasExtension(command)) { try { // 執行相應命令的實現類的telnet String result = extensionLoader.getExtension(command).telnet(channel, message); if (result == null) { return null; } // 返回結果 buf.append(result); } catch (Throwable t) { buf.append(t.getMessage()); } } else { buf.append("Unsupported command: "); buf.append(command); } } if (buf.length() > 0) { buf.append("\r\n"); } // 添加 telnet 提示語 if (prompt != null && prompt.length() > 0 && !noprompt) { buf.append(prompt); } return buf.toString(); } }
該類只實現了telnet方法,其中的邏輯仍是比較清晰,就是根據對應的命令去讓對應的實現類產生命令結果。數組
該類實現了TelnetHandler接口,封裝了clear命令的實現。併發
@Activate @Help(parameter = "[lines]", summary = "Clear screen.", detail = "Clear screen.") public class ClearTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 清除屏幕上的內容行數 int lines = 100; if (message.length() > 0) { // 若是不是一個數字 if (!StringUtils.isInteger(message)) { return "Illegal lines " + message + ", must be integer."; } lines = Integer.parseInt(message); } StringBuilder buf = new StringBuilder(); // 一行一行清除 for (int i = 0; i < lines; i++) { buf.append("\r\n"); } return buf.toString(); } }
該類實現了TelnetHandler接口,封裝了exit命令的實現。app
@Activate @Help(parameter = "", summary = "Exit the telnet.", detail = "Exit the telnet.") public class ExitTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 關閉通道 channel.close(); return null; } }
該類實現了TelnetHandler接口,封裝了help命令的實現。ide
@Activate @Help(parameter = "[command]", summary = "Show help.", detail = "Show help.") public class HelpTelnetHandler implements TelnetHandler { /** * 擴展加載器 */ private final ExtensionLoader<TelnetHandler> extensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) { // 若是須要查看某一個命令的幫助 if (message.length() > 0) { if (!extensionLoader.hasExtension(message)) { return "No such command " + message; } // 得到對應的擴展實現類 TelnetHandler handler = extensionLoader.getExtension(message); Help help = handler.getClass().getAnnotation(Help.class); StringBuilder buf = new StringBuilder(); // 生成命令和幫助信息 buf.append("Command:\r\n "); buf.append(message + " " + help.parameter().replace("\r\n", " ").replace("\n", " ")); buf.append("\r\nSummary:\r\n "); buf.append(help.summary().replace("\r\n", " ").replace("\n", " ")); buf.append("\r\nDetail:\r\n "); buf.append(help.detail().replace("\r\n", " \r\n").replace("\n", " \n")); return buf.toString(); // 若是查看全部命令的幫助 } else { List<List<String>> table = new ArrayList<List<String>>(); // 得到全部命令的提示信息 List<TelnetHandler> handlers = extensionLoader.getActivateExtension(channel.getUrl(), "telnet"); if (handlers != null && !handlers.isEmpty()) { for (TelnetHandler handler : handlers) { Help help = handler.getClass().getAnnotation(Help.class); List<String> row = new ArrayList<String>(); String parameter = " " + extensionLoader.getExtensionName(handler) + " " + (help != null ? help.parameter().replace("\r\n", " ").replace("\n", " ") : ""); row.add(parameter.length() > 50 ? parameter.substring(0, 50) + "..." : parameter); String summary = help != null ? help.summary().replace("\r\n", " ").replace("\n", " ") : ""; row.add(summary.length() > 50 ? summary.substring(0, 50) + "..." : summary); table.add(row); } } return "Please input \"help [command]\" show detail.\r\n" + TelnetUtils.toList(table); } } }
help分爲了須要查看某一個命令的幫助仍是查看所有命令的幫助。工具
該類實現了TelnetHandler接口,封裝了log命令的實現。源碼分析
@Activate @Help(parameter = "level", summary = "Change log level or show log ", detail = "Change log level or show log") public class LogTelnetHandler implements TelnetHandler { public static final String SERVICE_KEY = "telnet.log"; @Override public String telnet(Channel channel, String message) { long size = 0; File file = LoggerFactory.getFile(); StringBuffer buf = new StringBuffer(); if (message == null || message.trim().length() == 0) { buf.append("EXAMPLE: log error / log 100"); } else { String str[] = message.split(" "); if (!StringUtils.isInteger(str[0])) { // 設置日誌級別 LoggerFactory.setLevel(Level.valueOf(message.toUpperCase())); } else { // 得到日誌長度 int SHOW_LOG_LENGTH = Integer.parseInt(str[0]); if (file != null && file.exists()) { try { FileInputStream fis = new FileInputStream(file); try { FileChannel filechannel = fis.getChannel(); try { size = filechannel.size(); ByteBuffer bb; if (size <= SHOW_LOG_LENGTH) { // 分配緩衝區 bb = ByteBuffer.allocate((int) size); // 讀日誌數據 filechannel.read(bb, 0); } else { int pos = (int) (size - SHOW_LOG_LENGTH); // 分配緩衝區 bb = ByteBuffer.allocate(SHOW_LOG_LENGTH); // 讀取日誌數據 filechannel.read(bb, pos); } bb.flip(); String content = new String(bb.array()).replace("<", "<") .replace(">", ">").replace("\n", "<br/><br/>"); buf.append("\r\ncontent:" + content); buf.append("\r\nmodified:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(file.lastModified())))); buf.append("\r\nsize:" + size + "\r\n"); } finally { filechannel.close(); } } finally { fis.close(); } } catch (Exception e) { buf.append(e.getMessage()); } } else { size = 0; buf.append("\r\nMESSAGE: log file not exists or log appender is console ."); } } } buf.append("\r\nCURRENT LOG LEVEL:" + LoggerFactory.getLevel()) .append("\r\nCURRENT LOG APPENDER:" + (file == null ? "console" : file.getAbsolutePath())); return buf.toString(); } }
log命令實現原理就是從日誌文件中把日誌信息讀取出來。
該類實現了TelnetHandler接口,封裝了status命令的實現。
@Activate @Help(parameter = "[-l]", summary = "Show status.", detail = "Show status.") public class StatusTelnetHandler implements TelnetHandler { private final ExtensionLoader<StatusChecker> extensionLoader = ExtensionLoader.getExtensionLoader(StatusChecker.class); @Override public String telnet(Channel channel, String message) { // 顯示狀態列表 if (message.equals("-l")) { List<StatusChecker> checkers = extensionLoader.getActivateExtension(channel.getUrl(), "status"); String[] header = new String[]{"resource", "status", "message"}; List<List<String>> table = new ArrayList<List<String>>(); Map<String, Status> statuses = new HashMap<String, Status>(); if (checkers != null && !checkers.isEmpty()) { // 遍歷各個資源的狀態,若是一個當所有 OK 時則顯示 OK,只要有一個 ERROR 則顯示 ERROR,只要有一個 WARN 則顯示 WARN for (StatusChecker checker : checkers) { String name = extensionLoader.getExtensionName(checker); Status stat; try { stat = checker.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(name, stat); if (stat.getLevel() != null && stat.getLevel() != Status.Level.UNKNOWN) { List<String> row = new ArrayList<String>(); row.add(name); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage() == null ? "" : stat.getMessage()); table.add(row); } } } Status stat = StatusUtils.getSummaryStatus(statuses); List<String> row = new ArrayList<String>(); row.add("summary"); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage()); table.add(row); return TelnetUtils.toTable(header, table); } else if (message.length() > 0) { return "Unsupported parameter " + message + " for status."; } String status = channel.getUrl().getParameter("status"); Map<String, Status> statuses = new HashMap<String, Status>(); if (status != null && status.length() > 0) { String[] ss = Constants.COMMA_SPLIT_PATTERN.split(status); for (String s : ss) { StatusChecker handler = extensionLoader.getExtension(s); Status stat; try { stat = handler.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(s, stat); } } Status stat = StatusUtils.getSummaryStatus(statuses); return String.valueOf(stat.getLevel()); } }
該接口是幫助文檔接口
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Help { String parameter() default ""; String summary(); String detail() default ""; }
能夠看上在每一個命令的實現類上都加上了@Help註解,爲了添加一些幫助文案。
該類是Telnet命令的工具類,其中邏輯我就不介紹了。
該類繼承了TransportCodec,是telnet的編解碼類。
private static final Logger logger = LoggerFactory.getLogger(TelnetCodec.class); /** * 歷史命令列表 */ private static final String HISTORY_LIST_KEY = "telnet.history.list"; /** * 歷史命令位置,就是用上下鍵來找歷史命令 */ private static final String HISTORY_INDEX_KEY = "telnet.history.index"; /** * 向上鍵 */ private static final byte[] UP = new byte[]{27, 91, 65}; /** * 向下鍵 */ private static final byte[] DOWN = new byte[]{27, 91, 66}; /** * 回車 */ private static final List<?> ENTER = Arrays.asList(new Object[]{new byte[]{'\r', '\n'} /* Windows Enter */, new byte[]{'\n'} /* Linux Enter */}); /** * 退出 */ private static final List<?> EXIT = Arrays.asList(new Object[]{new byte[]{3} /* Windows Ctrl+C */, new byte[]{-1, -12, -1, -3, 6} /* Linux Ctrl+C */, new byte[]{-1, -19, -1, -3, 6} /* Linux Pause */});
private static Charset getCharset(Channel channel) { if (channel != null) { // 得到屬性設置 Object attribute = channel.getAttribute(Constants.CHARSET_KEY); // 返回指定字符集的charset對象。 if (attribute instanceof String) { try { return Charset.forName((String) attribute); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } else if (attribute instanceof Charset) { return (Charset) attribute; } URL url = channel.getUrl(); if (url != null) { String parameter = url.getParameter(Constants.CHARSET_KEY); if (parameter != null && parameter.length() > 0) { try { return Charset.forName(parameter); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } } // 默認的編碼是utf-8 try { return Charset.forName(Constants.DEFAULT_CHARSET); } catch (Throwable t) { logger.warn(t.getMessage(), t); } return Charset.defaultCharset(); }
該方法是得到通道的字符集,根據url中編碼來得到字符集,默認是utf-8。
@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { // 若是須要編碼的是 telnet 命令結果 if (message instanceof String) { //若是爲客戶端側的通道message直接返回 if (isClientSide(channel)) { message = message + "\r\n"; } // 得到字節數組 byte[] msgData = ((String) message).getBytes(getCharset(channel).name()); // 寫入緩衝區 buffer.writeBytes(msgData); } else { super.encode(channel, buffer, message); } }
該方法是編碼方法。
@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 得到緩衝區可讀的字節 int readable = buffer.readableBytes(); byte[] message = new byte[readable]; // 從緩衝區讀數據 buffer.readBytes(message); return decode(channel, buffer, readable, message); } @SuppressWarnings("unchecked") protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException { // 若是是客戶端側,直接返回結果 if (isClientSide(channel)) { return toString(message, getCharset(channel)); } // 檢驗消息長度 checkPayload(channel, readable); if (message == null || message.length == 0) { return DecodeResult.NEED_MORE_INPUT; } // 若是回退 if (message[message.length - 1] == '\b') { // Windows backspace echo try { boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name())); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } return DecodeResult.NEED_MORE_INPUT; } // 若是命令是退出 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command))); } // 關閉通道 channel.close(); return null; } } boolean up = endsWith(message, UP); boolean down = endsWith(message, DOWN); // 若是用上下鍵找歷史命令 if (up || down) { LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); if (history == null || history.isEmpty()) { return DecodeResult.NEED_MORE_INPUT; } Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); Integer old = index; if (index == null) { index = history.size() - 1; } else { // 向上 if (up) { index = index - 1; if (index < 0) { index = history.size() - 1; } } else { // 向下 index = index + 1; if (index > history.size() - 1) { index = 0; } } } // 得到歷史命令,併發送給客戶端 if (old == null || !old.equals(index)) { // 設置當前命令位置 channel.setAttribute(HISTORY_INDEX_KEY, index); // 得到歷史命令 String value = history.get(index); // 清除客戶端原有命令,用查到的歷史命令替代 if (old != null && old >= 0 && old < history.size()) { String ov = history.get(old); StringBuilder buf = new StringBuilder(); for (int i = 0; i < ov.length(); i++) { buf.append("\b"); } for (int i = 0; i < ov.length(); i++) { buf.append(" "); } for (int i = 0; i < ov.length(); i++) { buf.append("\b"); } value = buf.toString() + value; } try { channel.send(value); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } } // 返回,須要更多指令 return DecodeResult.NEED_MORE_INPUT; } // 關閉命令 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command " + command)); } channel.close(); return null; } } byte[] enter = null; // 若是命令是回車 for (Object command : ENTER) { if (endsWith(message, (byte[]) command)) { enter = (byte[]) command; break; } } if (enter == null) { return DecodeResult.NEED_MORE_INPUT; } LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); // 移除歷史命令 channel.removeAttribute(HISTORY_INDEX_KEY); // 將歷史命令拼接 if (history != null && !history.isEmpty() && index != null && index >= 0 && index < history.size()) { String value = history.get(index); if (value != null) { byte[] b1 = value.getBytes(); byte[] b2 = new byte[b1.length + message.length]; System.arraycopy(b1, 0, b2, 0, b1.length); System.arraycopy(message, 0, b2, b1.length, message.length); message = b2; } } // 將命令字節數組,轉成具體的一條命令 String result = toString(message, getCharset(channel)); if (result.trim().length() > 0) { if (history == null) { history = new LinkedList<String>(); channel.setAttribute(HISTORY_LIST_KEY, history); } if (history.isEmpty()) { history.addLast(result); } else if (!result.equals(history.getLast())) { history.remove(result); // 添加當前命令到歷史尾部 history.addLast(result); // 超過上限,移除歷史的頭部 if (history.size() > 10) { history.removeFirst(); } } } return result; }
該方法是編碼。
該部分相關的源碼解析地址: https://github.com/CrazyHZM/i...
該文章講解了telnet的相關實現邏輯,本文有興趣的朋友能夠看看。下一篇我會講解基於grizzly實現遠程通訊部分。