Golang 實現UDPServer併發送消息到ActiveMQ

示例代碼git

package main

import (
    "net"
    "os"
    "github.com/gpmgo/gopm/modules/goconfig"
    "github.com/go-stomp/stomp"
    "time"
    "strconv"
    "log"
    "strings"
)




// 限制goroutine數量
var limitChan = make(chan bool, 10000) // Todo 從配置文件中讀取

// 限制同時處理消息數量
var msgChan = make(chan string, 10000) // Todo 從配置文件中讀取
var activeMqLimitedChan = make(chan bool, 100)
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var connectTimes = 0
var udpAddress = "0.0.0.0"    // Todo 從配置文件中讀取
var udpPort = "514"    // Todo 從配置文件中讀取
var logFilePath = "/var/log/syslog_server/"
var configFilePath = "./config.ini"
// UDP goroutine 實現併發讀取UDP數據
func udpProcess(conn *net.UDPConn) {
    defer func() {
        if e := recover(); e != nil {
            // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 記錄錯誤日誌
            logger.Println("udpProcess error:", e)
        }
        // 釋放出一個協程
        <- limitChan
    }()

    // 最大讀取數據大小
    data := make([]byte, 1024)
    n, _, err := conn.ReadFromUDP(data)
    if err != nil {
        panic(err)
    }

    // 獲取對端的IP地址
    // remoteAddr := conn.RemoteAddr()
    // msgChan <- remoteAddr.String() + " " + string(data[:n])

    msgChan <- string(data[:n])


}


func udpServer(address, port string) {
    // @todo 如何防止udpServer 一直Panic致使無限循環重啓
    defer func() {
        if e := recover(); e != nil {
            // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 記錄錯誤日誌
            logger.Println("udpServer error:", e)

            // udpServer啓動失敗後,間隔10秒後重試
            time.Sleep(10 * time.Second)
            udpServer(udpAddress, udpPort)
        }
    }()

    udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
    conn, err := net.ListenUDP("udp", udpAddr)
    defer conn.Close()
    if err != nil {
        panic(err)
    }

    for {
        limitChan <- true
        go udpProcess(conn)
    }
}


// 讀取ActiveMQ配置信息
func getConfiguration(){
    defer func() {
        if e := recover(); e != nil {
            // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
                os.Exit(1)
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 記錄錯誤日誌
            logger.Println("Get Configuration error:", e)
        }
    }()

    configFile, err := goconfig.LoadConfigFile(configFilePath)
    if err != nil {
        panic(err)
    }

    host, err = configFile.GetValue("active_mq", "host")
    if err != nil {
        // 若是沒有配置主機,則使用本地主機
        host = "127.0.0.1"
    }
    port, err = configFile.GetValue("active_mq", "port")
    if err != nil {
        // 若是沒配置端口,則使用默認端口
        port = "61613"
    }

    activeQueue, err = configFile.GetValue("active_mq", "queue")
    if err != nil {
        // 若是沒配置端口,則使用默認隊列名
        activeQueue = "syslog.queue"
    }
}

// 使用IP和端口鏈接到ActiveMQ服務器, 返回ActiveMQ鏈接對象
func connActiveMq(){
    // @todo 如何防止無限循環
    defer func() {
        if e := recover(); e != nil {
            // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 記錄錯誤日誌
            logger.Println("connActiveMq error:", e)

            // ActiveMQ服務器鏈接失敗後,間隔3秒後重試
            time.Sleep(3 * time.Second)
            activeMq = nil
            connActiveMq()
        }
    }()

    // @todo 實現斷開重連
    if activeMq == nil {
        var err error
        activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
        if err != nil {
            connectTimes ++
            if connectTimes >= 100 {
                time.Sleep(60 * time.Second)
            }else if connectTimes >= 10 {
                time.Sleep(10 * time.Second)
            }else {
                time.Sleep(3 * time.Second)
            }
            panic(err.Error() + ", 從新鏈接ActiveMQ, 已重試次數: " + strconv.Itoa(connectTimes))

        }else {
            connectTimes = 0
        }
    }
}


func activeMqProducer(c chan string){
    // @todo 如何防止activeMqProducer 退出
    defer func() {
        if e := recover(); e != nil {
            // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 記錄錯誤日誌
            logger.Println("activeMqProducer error:", e)

            // 重試
            go activeMqProducer(msgChan)
        }
    }()
    for{
        activeMqLimitedChan <- true    // 限制開啓協程數量
        contentMsg := <-c
        go func() {
            defer func() {
                if e := recover(); e != nil {
                    err := os.MkdirAll(logFilePath, 777)
                    log.Fatalln("create log dirctory error: ", err.Error())
                    // 初始化日誌,天天生成一個日誌文件,日誌文件名以日誌結尾
                    logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                    logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 應該判斷error,此處簡略
                    defer logFile.Close()
                    if err != nil {
                        log.Fatalln("open log file error: ", err.Error())
                    }
                    logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

                    // 記錄錯誤日誌
                    logger.Println("activeMqProducer error:", e)
                }
                // 釋放出一個協程
                <- activeMqLimitedChan
            }()

            err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
            if err != nil {
                if err.Error() == "connection already closed"{
                    activeMq = nil
                    connActiveMq()
                    activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                }
                panic(err)
            }
        }()

    }

}


func init(){
    // 初始化 ActiveMQ 配置
    getConfiguration()

    // 鏈接到 ActiveMQ 服務器
    connActiveMq()

    // 啓動一個協程將Syslog消息放入ActiveMQ隊列中
    go activeMqProducer(msgChan)

}

func main() {
    defer activeMq.Disconnect()
    udpServer(udpAddress, udpPort)
}
相關文章
相關標籤/搜索