手把手用 UDP 實現 Node 服務日誌歸集(附完整源碼)

日誌歸集:顧名思義,就是把日誌歸集起來。html

在開發 Node 服務的時候,咱們常常會打印各類日誌,好比 info, error 日誌。前端

若是咱們已經將服務已經部署了不少機器上,這個時候若是須要查詢暱稱爲」張三「的日誌流水,那就很痛苦了,機器越多越難搞,由於不知道暱稱爲」張三「的日誌流水到底在哪一臺,甚至可能還分散在多臺服務器。node

所以,咱們須要作日誌歸集,這樣咱們在一臺機器上就能夠查看全部的日誌了。c++

本文目錄:git

  • 使用 dgram 實現日誌歸集
  • UDP 數據包 和 Unix 數據報
  • 再看 dgram

第一步:收集日誌

服務是部署在多臺服務器上,每臺機器上都會打印日誌,最終咱們要歸集起來到一臺機器上展現,那麼首先須要把每臺服務器上的日誌拿到,通常有兩種實現方式。github

使用日誌 logger 組件

若是整個系統日誌輸出,咱們是使用統一的自定義 logger 組件,好比執行 logger.info(), logger.error() 來輸出日誌的話。那麼咱們就很容易收集日誌了,直接在 logger 的 info , error, warn 等方法裏面收集便可。web

好比以下,在 logger 日誌對象裏面,增長存儲到 logList 臨時對象裏面。api

const logList=[];
['info','warn','error'].forEach((type)=>{  this[type] = ()=>{  // do something  logList.push({type:type,data:Array.from(arguments).join(" ")});  } }); 複製代碼

這裏咱們須要用 logList 數組保存起來,是爲了數據能批量發送,在指定的間隔時間(好比 1s),發送一第二天志,而不是每執行一次 log,就須要發送一次請求。數組

掃描文件上報

有些使用了第三方框架,寫日誌,並非通過咱們自定義的 logger 組件,這個時候,咱們能夠採起讀取文件的方式來收集日誌數據。實現以下:服務器

const fsExtra = require('fs-extra');
// 監聽文件 path 的變化 fsExtra.watchFile(path, (curr, prev) => {  // 有時候響應不穩定,會觸發兩次,第二次觸發的時候,對象裏面的時間戳會相等,因此判斷當前時間戳小於等於上一次時間戳可過濾多餘的監聽。  if (curr.mtime <= prev.mtime) return;  // 爲了提升性能,咱們採起只讀的方式來 open  fsExtra.open(path, 'r', function(error, fd) {  if (error) {  logger.error('open log error', error);  return;  }  // 建立一個緩衝,長度爲(當前文件大小-文件上一個狀態的大小) 因爲這裏咱們已經明確獲取到長度了,因此建立buffer可使用性能更好的 allocUnsafe  buffer = Buffer.allocUnsafe(curr.size - prev.size);  fsExtra.read(fd, buffer, 0, (curr.size - prev.size), prev.size, function(err, bytesRead, buffer) {  if (err) {  logger.error('read log error', error);  return;  }  const data = buffer.toString();  logList.push({data:data});  });  }); 複製代碼

邏輯也比較簡單,監聽日誌文件(若是須要監聽文件夾,在可使用 watch)。當監聽到文件變動的時候,已只讀的方式打開文件,而後使用 read 函數讀取文件的從起始到結束爲止的內容。

第二步:客戶端發送 UDP 數據包

UDP 數據包的特色是無鏈接,結構簡單,對系統資源消耗少。缺點是數據不保證正確,且數據包到達前後順序不保證。

對於咱們的日誌歸集,顯然不須要保證正確,且前後順序影響不大,咱們在每一條數據消息裏面攜帶發送消息客戶端的時間戳便可。因此咱們選擇 UDP 數據包。

這裏到底使用 udp4 仍是 udp6 ,取決於咱們服務器,對應咱們熟悉的是 IPV4 和 IPV6。這裏使用 udp4。

const dgram = require('dgram');
const clientSocket = dgram.createSocket('udp4'); // 原創 udp 日誌服務器的 ip 和 port,可能會是配置下發的方式來肯定。若是是明確的一臺服務器,也能夠寫死。 let remoteUDPServerIP = 'xxx,xxx,xxx,xxx'; let remoteUDPServerPort = 'xxx'; // 當前 udp 服務的 ip clientSocket.bind(7777); setInterval(()=>{  if(logList.length==0) return;  const data = logList.join("\r\n");  logList = [];  clientSocket.send(data, 0, data.length, remoteUDPServerPort, remoteUDPServerIP); },1000);  複製代碼

開啓定時器,掃描存儲的日誌列表。一般來講,若是咱們的日誌數據 logList ,是由咱們本身定義的 logger 獲取的,那麼咱們須要先用數組存起來,而後定時發送 UDP 數據包到 server 服務器上,達到批量發送的目的,減小發送頻率。

可是若是咱們是使用讀取文件的方式來拿到須要上報的日誌的話,咱們能夠監聽到文件變化而後直接上報便可,無需使用定時器發送,由於會在寫日誌文件的時候,已經作了批量寫入了(通常會是 1s 更新一次)。

第三步:服務器接受 UDP 數據包

使用 dgram 建立服務端,而後接受客戶端發送的消息,最終寫到統一的日誌文件裏面。

const path = require("path");
const fsExtra = require('fs-extra'); const dgram = require('dgram'); const serverSocket = dgram.createSocket('udp4');  serverSocket.on('message', function(msg, rinfo){  handleMsg(msg,rinfo.address); }); // 服務端端口 serverSocket.bind(7777);  var fp = "./logs"+path.sep+path.sep+msg.type; fsExtra.ensureDir(fp);  function handleMsg(msg,address){  try{  msg = typeof msg=="object"?JSON.parse(msg):{msg:msg};  }catch(e){  console.log(e);  }  if(!msg.type) msg.type="info";   // 組裝上報參數  let filepath = fp+path.sep+ getDate()+".log";  let content = [];  content.push(getDate(true));  content.push(address);  content.push(typeof msg.msg=="object"?JSON.stringify(msg.msg):msg.msg);  content.push('\r\n');  // 寫入日誌文件  fs.appendFile(filepath, content.join(" "), err=> {  if(err){  fs.appendFile(".logs/white-error.log", filepath+"\t"+msg, e=>{});  }else{  // report   }  }); } function getDate(time) {  let date = new Date();  if(time){  let hour = date.getHours();  let minute = date.getMinutes();  let second = date.getSeconds();  let miseconds = date.getMilliseconds();  return [hour, minute, second,miseconds].map(formatNumber).join(':');  }  let year = date.getFullYear();  let month = date.getMonth() + 1;  let day = date.getDate();  let p = [year, month, day].map(formatNumber).join('-');  return p; } function formatNumber(n) {  n = n.toString();  return n[1] ? n : '0' + n; } 複製代碼

這裏大夥看下應該能看懂了。建立 7777 端口的服務。而後監聽到消息,則當即寫入文件。

第四步:查詢日誌

至此,咱們就實現了一個簡單的 UDP 數據包的日誌歸集功能。無論部署多少臺服務器。咱們只須要在 ./logs/ 目錄下就能夠查看日誌了。執行 tail 命令就能夠查看各個服務器歸集過來的日誌了,固然時間可能會有錯亂,可是以每條消息的時間爲準便可。

$ tail -f logs/info/2020-04-19.log
複製代碼

UDP 數據包 和 Unix domain socket 數據報

UDP,全稱 User Datagram Protocol, 即用戶數據報協議。UDP 和 TCP 二者的具體差別就不羅列了,有興趣能夠去了解下。主要差別以下:

udp

這裏咱們主要看和 unix 協議的差別。

Unix domain socket(UDS):是 unix 服務器進程之間本地通訊 IPC 的一種。提供了兩套協議:字節流套接口(相似 TCP )和數據報套接口(相似 UDP )。

unix 數據報套接口與 UDP 的區別主要在於 UDS 只能作本機 IPC, 而 UDP 能夠是本機也能夠是遠程機器通訊。

若是僅僅是本機通訊的話,推薦直接使用 UDS。主要優點有如下兩個:

  • UDS 協議的數據報不會出現丟失亂序的狀況。
  • UDS 協議性能更優,不會跑到鏈路層,不會作數據報校驗。

固然若是爲了可擴展,後續遷移到不一樣的機器,也可使用 UDP 來實現。

UDS 協議可使用 Node 的 unix-dgram 組件來實現。

因爲 UDS 是本機通訊,因此不須要端口和 IP, 直接使用文件系統表示的路徑名來標識。好比 /path/to/socket 這個系統路徑。若是須要建立多個 server, 則指定不一樣的路徑便可。咱們來看下具體的使用代碼。

服務接收端:

const unix = require('unix-dgram');
const server = unix.createSocket('unix_dgram', function(buf) {  console.log('received ' + buf); }); server.bind('/path/to/socket'); 複製代碼

客戶發送端:

const message = Buffer('ping');
const client = unix.createSocket('unix_dgram'); client.send(message, 0, message.length, '/path/to/socket'); 複製代碼

也能夠自行定義其餘 EventsListener,

const unix = require('unix-dgram');
const client = unix.createSocket('unix_dgram'); client.on('error', function(err) {  console.error(err); }); client.on('connect', function() {  console.log('connected');  client.on('congestion', function() {  /* The server is not accepting data */  });  client.on('writable', function() {  /* The server can accept data */  });  var message = new Buffer('ping');  client.send(message); }); client.connect('/tmp/server'); 複製代碼

使用方式仍是比較簡單的,相信大夥看看就能懂了。

再看 dgram

dgram 是 Nodejs 提供的用於發送 UDP 數據報的模塊。

dgram 提供了很是豐富的 api,能夠參考文檔:nodejs.cn/api/dgram.h…;

主要功能是能夠實現 」單播」、「廣播」和「組播」消息。

單播

單播即單向通訊,客戶端向服務端發送消息,本文中的日誌歸集就是單播實現。參考上面實現。

廣播

廣播顧名思義,就是廣撒網,非點對點通訊。把數據發送給本地子網上的全部的機器,即廣播。要實現廣播,首先要獲取到廣播地址。好比我在終端輸入:ifconfig,以下顯示的 broadcast 就是廣播地址。

broadcast

知道了廣播地址,則能夠在服務端向子網內全部機器發送廣播消息了。

const dgram = require("dgram");
const serverUDP = dgram.createSocket("udp4"); serverUDP.on("listening", () => {  console.log("socket正在監聽...");  server.setBroadcast(true);  server.setTTL(64);  setTimout(() => {  server.send("你們好啊,我是服務端.", 7778, "192.168.0.255")  }, 1000) }) serverUDP.on("message", (msg, rinfo) => {  console.log(`msg from client ${rinfo.address}:${rinfo.port}`); }) server.bind(7777); 複製代碼

這裏我建立了一個服務端,監聽 7777 端口。向廣播地址:192.168.0.255 端口爲 7778 的全部客戶端發送廣播消息,因而屬於同一個廣播區域段的客戶端的 7778 端口都會收到服務端的消息。

默認是單播,須要廣播消息,設置 setBroadcast 爲 true 便可。

IP_TTL : 表示存活時間,每通過個路由器或者網關都會減小 TTL 數值,若是 TTL 被一個路由器減小到 0,這個數據報將不會繼續轉發。

組播

組播報文的目的地址使用D類IP地址, D類地址不能出如今IP報文的源IP地址字段。

224.0.0.0~224.0.0.255爲預留的組播地址(永久組地址),地址224.0.0.0保留不作分配,其它地址供路由協議使用; 224.0.1.0~224.0.1.255是公用組播地址,能夠用於Internet; 224.0.2.0~238.255.255.255爲用戶可用的組播地址(臨時組地址),全網範圍內有效; 239.0.0.0~239.255.255.255爲本地管理組播地址,僅在特定的本地範圍內有效。

下面已 224.1.1.1 組播地址測試。其餘代碼通廣播。

服務端:

const addr = '224.1.1.1';
server.on("listening",()=>{  console.log("socket正在監聽中.....");  server.addMembership(addr);  server.setMulticastTTL(64);  setTimout(()=>{  server.send('你們好啊,我是服務端.',7778,addr);  },1000) }) 複製代碼

客戶端:

const dgram = require("dgram");
const client = dgram.createSocket("udp4"); const addr = '224.1.1.1'; client.on("listening", () => {  console.log("socket正在監聽...");  client.addMembership(addr); }) client.on("message", (msg, rinfo) => {  console.log(`msg from server:${msg},addr:${rinfo.address}`); }) client.bind(7778) 複製代碼

到此爲止,簡易的日誌服務系統已經開放完成,在實際中,咱們可能不會直接這樣使用,通常有兩種方式:

  • 使用自研的 node 框架或者統一的日誌組件,而後由框架容器或者組件去實現全部的日誌收集,上報和展現。
  • 使用統一的日誌服務,好比 c++ 開發的日誌服務,按照配置和設定的規則,統一收集日誌信息,上報和展現。

對於業務開發者來講,可能就只須要在項目裏面配置下,就能享受流暢的日誌查看了。

然而咱們只有在工做中跳出僅僅做爲使用者的角度,去探索各個系統的實現原理,這樣才能遊刃有餘。

源碼 https://github.com/antiter/udp-log

歡迎關注個人微信公衆號,一塊兒作靠譜前端!

follow-me
相關文章
相關標籤/搜索