示例代碼git
package main import ( "net" "os" "github.com/gpmgo/gopm/modules/goconfig" "github.com/go-stomp/stomp" "time" "strconv" "log" "strings" ) // 限制goroutine數量 var limitChan = make(chan bool, 10000) // Todo 從配置文件中讀取 // 限制同時處理消息數量 var msgChan = make(chan string, 10000) // Todo 從配置文件中讀取 var activeMqLimitedChan = make(chan bool, 100) var activeMq *stomp.Conn var activeQueue string var host string var port string var connectTimes = 0 var udpAddress = "0.0.0.0" // Todo 從配置文件中讀取 var udpPort = "514" // Todo 從配置文件中讀取 var logFilePath = "/var/log/syslog_server/" var configFilePath = "./config.ini" // UDP goroutine 實現併發讀取UDP數據 func udpProcess(conn *net.UDPConn) { defer func() { if e := recover(); e != nil { // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error()) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("udpProcess error:", e) } // 釋放出一個協程 <- limitChan }() // 最大讀取數據大小 data := make([]byte, 1024) n, _, err := conn.ReadFromUDP(data) if err != nil { panic(err) } // 獲取對端的IP地址 // remoteAddr := conn.RemoteAddr() // msgChan <- remoteAddr.String() + " " + string(data[:n]) msgChan <- string(data[:n]) } func udpServer(address, port string) { // @todo 如何防止udpServer 一直Panic致使無限循環重啓 defer func() { if e := recover(); e != nil { // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error()) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("udpServer error:", e) // udpServer啓動失敗後,間隔10秒後重試 time.Sleep(10 * time.Second) udpServer(udpAddress, udpPort) } }() udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port)) conn, err := net.ListenUDP("udp", udpAddr) defer conn.Close() if err != nil { panic(err) } for { limitChan <- true go udpProcess(conn) } } // 讀取ActiveMQ配置信息 func getConfiguration(){ defer func() { if e := recover(); e != nil { // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error(), ", programing exit.") os.Exit(1) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("Get Configuration error:", e) } }() configFile, err := goconfig.LoadConfigFile(configFilePath) if err != nil { panic(err) } host, err = configFile.GetValue("active_mq", "host") if err != nil { // 若是沒有配置主機,則使用本地主機 host = "127.0.0.1" } port, err = configFile.GetValue("active_mq", "port") if err != nil { // 若是沒配置端口,則使用默認端口 port = "61613" } activeQueue, err = configFile.GetValue("active_mq", "queue") if err != nil { // 若是沒配置端口,則使用默認隊列名 activeQueue = "syslog.queue" } } // 使用IP和端口鏈接到ActiveMQ服務器, 返回ActiveMQ鏈接對象 func connActiveMq(){ // @todo 如何防止無限循環 defer func() { if e := recover(); e != nil { // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error()) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("connActiveMq error:", e) // ActiveMQ服務器鏈接失敗後,間隔3秒後重試 time.Sleep(3 * time.Second) activeMq = nil connActiveMq() } }() // @todo 實現斷開重連 if activeMq == nil { var err error activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port)) if err != nil { connectTimes ++ if connectTimes >= 100 { time.Sleep(60 * time.Second) }else if connectTimes >= 10 { time.Sleep(10 * time.Second) }else { time.Sleep(3 * time.Second) } panic(err.Error() + ", 從新鏈接ActiveMQ, 已重試次數: " + strconv.Itoa(connectTimes)) }else { connectTimes = 0 } } } func activeMqProducer(c chan string){ // @todo 如何防止activeMqProducer 退出 defer func() { if e := recover(); e != nil { // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error()) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("activeMqProducer error:", e) // 重試 go activeMqProducer(msgChan) } }() for{ activeMqLimitedChan <- true // 限制開啓協程數量 contentMsg := <-c go func() { defer func() { if e := recover(); e != nil { err := os.MkdirAll(logFilePath, 777) log.Fatalln("create log dirctory error: ", err.Error()) // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾 logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log" logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略 defer logFile.Close() if err != nil { log.Fatalln("open log file error: ", err.Error()) } logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile) // 記錄錯誤日誌 logger.Println("activeMqProducer error:", e) } // 釋放出一個協程 <- activeMqLimitedChan }() err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg)) if err != nil { if err.Error() == "connection already closed"{ activeMq = nil connActiveMq() activeMq.Send(activeQueue, "text/plain", []byte(contentMsg)) } panic(err) } }() } } func init(){ // 初始化 ActiveMQ 配置 getConfiguration() // 鏈接到 ActiveMQ 服務器 connActiveMq() // 啓動一個協程將Syslog消息放入ActiveMQ隊列中 go activeMqProducer(msgChan) } func main() { defer activeMq.Disconnect() udpServer(udpAddress, udpPort) }