以前講了 cat-client 進行cat埋點上報,那麼上報給誰呢?以及後續故事如何?讓咱們來看看 cat-consumer 是如何接收處理的?linux
由cat-client發送數據,cat-consumer進行接收請求處理,開始了處理問題之旅!bootstrap
// TcpSocketSender 往channel中寫入數據,此處有興趣的同窗能夠延伸下 netty 的源碼! private void sendInternal(MessageTree tree) { ChannelFuture future = m_manager.channel(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K m_codec.encode(tree, buf); int size = buf.readableBytes(); Channel channel = future.channel(); // 以 ByteBuf 形式發送數據 channel.writeAndFlush(buf); // 更新統計數據 if (m_statistics != null) { m_statistics.onBytes(size); } }
public void init() { try { startServer(m_port); } catch (Throwable e) { m_logger.error(e.getMessage(), e); } } public synchronized void startServer(int port) throws InterruptedException { boolean linux = getOSMatches("Linux") || getOSMatches("LINUX"); int threads = 24; ServerBootstrap bootstrap = new ServerBootstrap(); m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads); m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads); bootstrap.group(m_bossGroup, m_workerGroup); bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class); // 添加處理handler, 進行請求邏輯處理 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 此處僅爲一個解碼器,實際功能在該解碼器中完成 pipeline.addLast("decode", new MessageDecoder()); } }); bootstrap.childOption(ChannelOption.SO_REUSEADDR, true); bootstrap.childOption(ChannelOption.TCP_NODELAY, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); try { m_future = bootstrap.bind(port).sync(); m_logger.info("start netty server!"); } catch (Exception e) { m_logger.error("Started Netty Server Failed:" + port, e); } } // 消息解碼器,並處理具體業務邏輯,先確認數據已上傳完成,再進行邏輯處理 public class MessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { if (buffer.readableBytes() < 4) { return; } buffer.markReaderIndex(); int length = buffer.readInt(); buffer.resetReaderIndex(); if (buffer.readableBytes() < length + 4) { return; } try { if (length > 0) { ByteBuf readBytes = buffer.readBytes(length + 4); readBytes.markReaderIndex(); readBytes.readInt(); // 消息解碼,獲取頭信息,消息體 DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes); readBytes.resetReaderIndex(); tree.setBuffer(readBytes); // 交由handler處理實際邏輯 m_handler.handle(tree); m_processCount++; long flag = m_processCount % CatConstants.SUCCESS_COUNT; if (flag == 0) { m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT); } } else { // client message is error buffer.readBytes(length); } } catch (Exception e) { m_serverStateManager.addMessageTotalLoss(1); m_logger.error(e.getMessage(), e); } } }
//PlainTextMessageCodec.decode() 消息解碼 @Override public MessageTree decode(ByteBuf buf) { MessageTree tree = new DefaultMessageTree(); // 使用一個默認消息樹,用於接收消息 decode(buf, tree); return tree; } @Override public void decode(ByteBuf buf, MessageTree tree) { Context ctx = m_ctx.get().setBuffer(buf); // 解析頭信息 decodeHeader(ctx, tree); // 解析消息體 if (buf.readableBytes() > 0) { decodeMessage(ctx, tree); } } // 解析頭信息,以tab='\t' 和 lf='\n', 進行分割 protected void decodeHeader(Context ctx, MessageTree tree) { BufferHelper helper = m_bufferHelper; String id = helper.read(ctx, TAB); String domain = helper.read(ctx, TAB); String hostName = helper.read(ctx, TAB); String ipAddress = helper.read(ctx, TAB); String threadGroupName = helper.read(ctx, TAB); String threadId = helper.read(ctx, TAB); String threadName = helper.read(ctx, TAB); String messageId = helper.read(ctx, TAB); String parentMessageId = helper.read(ctx, TAB); String rootMessageId = helper.read(ctx, TAB); String sessionToken = helper.read(ctx, LF); if (VERSION.equals(id)) { tree.setDomain(domain); tree.setHostName(hostName); tree.setIpAddress(ipAddress); tree.setThreadGroupName(threadGroupName); tree.setThreadId(threadId); tree.setThreadName(threadName); tree.setMessageId(messageId); tree.setParentMessageId(parentMessageId); tree.setRootMessageId(rootMessageId); tree.setSessionToken(sessionToken); } else { throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id)); } } // 解析消息體 protected void decodeMessage(Context ctx, MessageTree tree) { Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>(); Message parent = decodeLine(ctx, null, stack); tree.setMessage(parent); // 循環讀取消息體,直到讀取完成 while (ctx.getBuffer().readableBytes() > 0) { Message message = decodeLine(ctx, (DefaultTransaction) parent, stack); if (message instanceof DefaultTransaction) { parent = message; } else { break; } } } // 解析內容棧出來 protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) { BufferHelper helper = m_bufferHelper; byte identifier = ctx.getBuffer().readByte(); String timestamp = helper.read(ctx, TAB); String type = helper.read(ctx, TAB); String name = helper.read(ctx, TAB); switch (identifier) { // t: transaction 類型消息, T: pop結束, E:Event, M: Metrics, L: Trace, H: heartbeat 消息 case 't': DefaultTransaction transaction = new DefaultTransaction(type, name, null); helper.read(ctx, LF); // get rid of line feed transaction.setTimestamp(m_dateHelper.parse(timestamp)); if (parent != null) { parent.addChild(transaction); } stack.push(parent); return transaction; case 'A': DefaultTransaction tran = new DefaultTransaction(type, name, null); String status = helper.read(ctx, TAB); String duration = helper.read(ctx, TAB); String data = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed tran.setTimestamp(m_dateHelper.parse(timestamp)); tran.setStatus(status); tran.addData(data); long d = Long.parseLong(duration.substring(0, duration.length() - 2)); tran.setDurationInMicros(d); if (parent != null) { parent.addChild(tran); return parent; } else { return tran; } case 'T': String transactionStatus = helper.read(ctx, TAB); String transactionDuration = helper.read(ctx, TAB); String transactionData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed parent.setStatus(transactionStatus); parent.addData(transactionData); long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2)); parent.setDurationInMicros(transactionD); return stack.pop(); case 'E': DefaultEvent event = new DefaultEvent(type, name); String eventStatus = helper.read(ctx, TAB); String eventData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed event.setTimestamp(m_dateHelper.parse(timestamp)); event.setStatus(eventStatus); event.addData(eventData); if (parent != null) { parent.addChild(event); return parent; } else { return event; } case 'M': DefaultMetric metric = new DefaultMetric(type, name); String metricStatus = helper.read(ctx, TAB); String metricData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed metric.setTimestamp(m_dateHelper.parse(timestamp)); metric.setStatus(metricStatus); metric.addData(metricData); if (parent != null) { parent.addChild(metric); return parent; } else { return metric; } case 'L': DefaultTrace trace = new DefaultTrace(type, name); String traceStatus = helper.read(ctx, TAB); String traceData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed trace.setTimestamp(m_dateHelper.parse(timestamp)); trace.setStatus(traceStatus); trace.addData(traceData); if (parent != null) { parent.addChild(trace); return parent; } else { return trace; } case 'H': DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name); String heartbeatStatus = helper.read(ctx, TAB); String heartbeatData = helper.read(ctx, TAB); helper.read(ctx, LF); // get rid of line feed heartbeat.setTimestamp(m_dateHelper.parse(timestamp)); heartbeat.setStatus(heartbeatStatus); heartbeat.addData(heartbeatData); if (parent != null) { parent.addChild(heartbeat); return parent; } else { return heartbeat; } default: m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: " + ctx.getBuffer().toString(Charset.forName("utf-8"))); throw new RuntimeException("Unknown identifier int name"); } }
// DefaultMessageHandler, 接過處理器的第一棒, 交由另外一實際的consumer(RealtimeConsumer) handler處理 @Override public void handle(MessageTree tree) { if (m_consumer == null) { m_consumer = lookup(MessageConsumer.class); } try { m_consumer.consume(tree); } catch (Throwable e) { m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e); } } // RealtimeConsumer, 進行消費數據 @Override public void consume(MessageTree tree) { String domain = tree.getDomain(); String ip = tree.getIpAddress(); // 進行權限檢測,ip,domain if (!m_blackListManager.isBlack(domain, ip)) { long timestamp = tree.getMessage().getTimestamp(); Period period = m_periodManager.findPeriod(timestamp); // 找到period, 再將消息分配過去,不然算做網絡異常 if (period != null) { period.distribute(tree); } else { m_serverStateManager.addNetworkTimeError(1); } } else { m_black++; if (m_black % CatConstants.SUCCESS_COUNT == 0) { Cat.logEvent("Discard", domain); } } }
public void distribute(MessageTree tree) { // 統計進行數進行加1 m_serverStateManager.addMessageTotal(tree.getDomain(), 1); boolean success = true; String domain = tree.getDomain(); // 將各類類型的監控數據分別取出進行處理 for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) { List<PeriodTask> tasks = entry.getValue(); int length = tasks.size(); int index = 0; boolean manyTasks = length > 1; if (manyTasks) { index = Math.abs(domain.hashCode()) % length; } PeriodTask task = tasks.get(index); // 若是有金條消息,將task從新入隊 boolean enqueue = task.enqueue(tree); if (enqueue == false) { if (manyTasks) { task = tasks.get((index + 1) % length); enqueue = task.enqueue(tree); if (enqueue == false) { success = false; } } else { success = false; } } } if (!success) { m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1); } } // PeriodTask.enqueue, 從新入隊消息,讓消費線程自行消費 LinkedBlockingQueue.offer(..) public boolean enqueue(MessageTree tree) { boolean result = m_queue.offer(tree); if (!result) { // trace queue overflow, 記錄入隊失敗日誌 m_queueOverflow++; if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) { m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow); } } return result; }
到此,一條消費線路就完成了。網絡
// PeriodTask 線程,做爲第二個消費線路session
@Override public void run() { try { // 分析各消息數據,作後臺消費處理 m_analyzer.analyze(m_queue); } catch (Exception e) { Cat.logError(e); } } // 調用統一的抽象類的模板方法,由各種進行具體的 process 處理 @Override public void analyze(MessageQueue queue) { while (!isTimeout() && isActive()) { MessageTree tree = queue.poll(); if (tree != null) { try { // 調用具體類的process process(tree); } catch (Throwable e) { m_errors++; if (m_errors == 1 || m_errors % 10000 == 0) { Cat.logError(e); } } } } // 若是出現超時或者中止動做,則把剩餘隊列處理完成再退出線程 while (true) { MessageTree tree = queue.poll(); if (tree != null) { try { process(tree); } catch (Throwable e) { m_errors++; if (m_errors == 1 || m_errors % 10000 == 0) { Cat.logError(e); } } } else { break; } } } // 超時規則,當前時間 > 開始時間+1小時+設置額外超時時間 protected boolean isTimeout() { long currentTime = System.currentTimeMillis(); long endTime = m_startTime + m_duration + m_extraTime; return currentTime > endTime; }
// 具體的 Anlalyzer示例: DumpAnlalyzer.process @Override public void process(MessageTree tree) { String domain = tree.getDomain(); if ("PhoenixAgent".equals(domain)) { return; } else { MessageId messageId = MessageId.parse(tree.getMessageId()); if (messageId.getVersion() == 2) { // 計算出當前時間範圍, long time = tree.getMessage().getTimestamp(); long fixedTime = time - time % (TimeHelper.ONE_HOUR); long idTime = messageId.getTimestamp(); long duration = fixedTime - idTime; if (duration == 0 || duration == ONE_HOUR || duration == -ONE_HOUR) { m_bucketManager.storeMessage(tree, messageId); } else { m_serverStateManager.addPigeonTimeError(1); } } } } // 存儲log消息到本地文件,並後續上傳到hdfs @Override public void storeMessage(final MessageTree tree, final MessageId id) { boolean errorFlag = true; int hash = Math.abs((id.getDomain() + '-' + id.getIpAddress()).hashCode()); int index = (int) (hash % m_gzipThreads); MessageItem item = new MessageItem(tree, id); LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1)); boolean result = queue.offer(item); if (result) { errorFlag = false; } else { if (m_last.offer(item)) { errorFlag = false; } } if (errorFlag) { m_serverStateManager.addMessageDumpLoss(1); } logStorageState(tree); } // 每1000個消息添加一個messageDump=1000 protected void logStorageState(final MessageTree tree) { String domain = tree.getDomain(); int size = ((DefaultMessageTree) tree).getBuffer().readableBytes(); m_serverStateManager.addMessageSize(domain, size); if ((++m_total) % CatConstants.SUCCESS_COUNT == 0) { m_serverStateManager.addMessageDump(CatConstants.SUCCESS_COUNT); } }
@Override public void process(MessageTree tree) { String domain = tree.getDomain(); if (m_serverFilterConfigManager.validateDomain(domain)) { EventReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true); Message message = tree.getMessage(); String ip = tree.getIpAddress(); if (message instanceof Transaction) { processTransaction(report, tree, (Transaction) message, ip); } else if (message instanceof Event) { processEvent(report, tree, (Event) message, ip); } } } // 循環處理多個transation private void processTransaction(EventReport report, MessageTree tree, Transaction t, String ip) { List<Message> children = t.getChildren(); for (Message child : children) { if (child instanceof Transaction) { processTransaction(report, tree, (Transaction) child, ip); } else if (child instanceof Event) { processEvent(report, tree, (Event) child, ip); } } } // StateAnalyzer.process 對cat的機器做展現 @Override protected void process(MessageTree tree) { String domain = tree.getDomain(); if (m_serverFilterConfigManager.validateDomain(domain)) { StateReport report = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true); String ip = tree.getIpAddress(); Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress()); machine.findOrCreateProcessDomain(domain).addIp(ip); } }
public void start() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(), df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1)))); for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) { List<PeriodTask> taskList = tasks.getValue(); for (int i = 0; i < taskList.size(); i++) { PeriodTask task = taskList.get(i); task.setIndex(i); Threads.forGroup("Cat-RealtimeConsumer").start(task); } } }
@Override public void run() { while (m_active) { // make save message id index asyc m_idfactory.saveMark(); checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); doubleCheckActiveServer(activeFuture); reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }
// PeriodManager, 用於滾動式處理每小時的監控數據 public class PeriodManager implements Task { private PeriodStrategy m_strategy; private List<Period> m_periods = new ArrayList<Period>(); private boolean m_active; @Inject private MessageAnalyzerManager m_analyzerManager; @Inject private ServerStatisticManager m_serverStateManager; @Inject private Logger m_logger; public static long EXTRATIME = 3 * 60 * 1000L; public PeriodManager(long duration, MessageAnalyzerManager analyzerManager, ServerStatisticManager serverStateManager, Logger logger) { m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME); m_active = true; m_analyzerManager = analyzerManager; m_serverStateManager = serverStateManager; m_logger = logger; } private void endPeriod(long startTime) { int len = m_periods.size(); for (int i = 0; i < len; i++) { Period period = m_periods.get(i); if (period.isIn(startTime)) { period.finish(); m_periods.remove(i); break; } } } public void init() { long startTime = m_strategy.next(System.currentTimeMillis()); startPeriod(startTime); } @Override public void run() { while (m_active) { try { long now = System.currentTimeMillis(); long value = m_strategy.next(now); if (value > 0) { startPeriod(value); } else if (value < 0) { // 上個運行週期,即1小時已完成後,啓用一個結束線程進行規劃原來的數據 Threads.forGroup("cat").start(new EndTaskThread(-value)); } } catch (Throwable e) { Cat.logError(e); } try { Thread.sleep(1000L); } catch (InterruptedException e) { break; } } } private void startPeriod(long startTime) { long endTime = startTime + m_strategy.getDuration(); Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger); m_periods.add(period); period.start(); } private class EndTaskThread implements Task { private long m_startTime; public EndTaskThread(long startTime) { m_startTime = startTime; } @Override public void run() { // 調用外部類的結束方法 endPeriod(m_startTime); } } } // Period.finish(), 結束 public void finish() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date startDate = new Date(m_startTime); Date endDate = new Date(m_endTime - 1); m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate), df.format(endDate))); try { for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) { for (PeriodTask task : tasks.getValue()) { task.finish(); } } } catch (Throwable e) { Cat.logError(e); } finally { m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate), df.format(endDate))); } } // PeriodTask.finish(), 真正處理上一週期數據 public void finish() { try { // 調用各自分析器的 doCheckpoint 檢查,後續處理 m_analyzer.doCheckpoint(true); // 銷燬分析器, help gc m_analyzer.destroy(); } catch (Exception e) { Cat.logError(e); } } // 舉例 EventAnalyzer.doCheckpoint, 需加鎖處理 @Override public synchronized void doCheckpoint(boolean atEnd) { if (atEnd && !isLocalMode()) { m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index); } else { m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index); } } // DefaultReportManager.storeHourlyReports, 存儲logview, 存在統計數據到db @Override public void storeHourlyReports(long startTime, StoragePolicy policy, int index) { Transaction t = Cat.newTransaction("Checkpoint", m_name); Map<String, T> reports = m_reports.get(startTime); ReportBucket bucket = null; try { t.addData("reports", reports == null ? 0 : reports.size()); if (reports != null) { Set<String> errorDomains = new HashSet<String>(); for (String domain : reports.keySet()) { if (!m_validator.validate(domain)) { errorDomains.add(domain); } } for (String domain : errorDomains) { reports.remove(domain); } if (!errorDomains.isEmpty()) { m_logger.info("error domain:" + errorDomains); } m_reportDelegate.beforeSave(reports); if (policy.forFile()) { bucket = m_bucketManager.getReportBucket(startTime, m_name, index); try { storeFile(reports, bucket); } finally { m_bucketManager.closeBucket(bucket); } } if (policy.forDatabase()) { storeDatabase(startTime, reports); } } t.setStatus(Message.SUCCESS); } catch (Throwable e) { Cat.logError(e); t.setStatus(e); m_logger.error(String.format("Error when storing %s reports of %s!", m_name, new Date(startTime)), e); } finally { cleanup(startTime); t.complete(); if (bucket != null) { m_bucketManager.closeBucket(bucket); } } } // DefaultReportManager.storeDatabase, 存儲 db private void storeDatabase(long startTime, Map<String, T> reports) { Date period = new Date(startTime); String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); for (T report : reports.values()) { try { String domain = m_reportDelegate.getDomain(report); HourlyReport r = m_reportDao.createLocal(); r.setName(m_name); r.setDomain(domain); r.setPeriod(period); r.setIp(ip); r.setType(1); m_reportDao.insert(r); int id = r.getId(); byte[] binaryContent = m_reportDelegate.buildBinary(report); HourlyReportContent content = m_reportContentDao.createLocal(); content.setReportId(id); content.setContent(binaryContent); m_reportContentDao.insert(content); m_reportDelegate.createHourlyTask(report); } catch (Throwable e) { Cat.getProducer().logError(e); } } } // DefaultReportManager.storeFile, 存在file, xml private void storeFile(Map<String, T> reports, ReportBucket bucket) { for (T report : reports.values()) { try { String domain = m_reportDelegate.getDomain(report); String xml = m_reportDelegate.buildXml(report); bucket.storeById(domain, xml); } catch (Exception e) { Cat.logError(e); } } }
總結起來就幾個東西:
1. 使用netty開啓高性能的接收服務;多線程
2. 使用隊列進行保存消息;dom
3. 使用單獨線程檢測channel有效性,保證高可用;分佈式
4. 全部單小時的數據,保存在內存中,速度特別快;ide
5. 多線程技術發揮得很好;oop
6. 模板模式的應用,阻塞隊列的應用;性能
7. hdfs的應用,優雅停機的應用;
等等,來個圖展現下:
task 運行過程:
週期報告,彙總: