package main import ( "encoding/json" "flag" "fmt" "log" "net/http" "time" "config" "framework/logger" "global" "models/function" "models/schema" "github.com/go-redis/redis" "github.com/gorilla/websocket" "github.com/labstack/echo" ) var clients = make(map[*websocket.Conn]bool) var broadcast = make(chan Message) var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} //不使用默認設置,若是線上環境可能須要使用默認配置 var chananel = make(chan schema.Listening) //數據chan var configFile *string = flag.String("config", "./bin/etc/conf.yaml", "agency config file")//這是數據庫的配置文件解析,單寫的時候提出來 var agentSlice []map[string]*websocket.Conn //socket對應關係存儲 //發送消息結構體 type Message struct { Message interface{} `json:"message"` SiteId string `json:"site_id"` SiteIndexId string `json:"site_index_id"` Count int64 `json:"count"` } //測試用[正式修改以後能夠刪除] func hu(w http.ResponseWriter, r *http.Request) { siteid := r.FormValue("site_id") siteIndexId := r.FormValue("site_index_id") fmt.Println(siteIndexId, siteid) s := schema.Listening{"zym", "b", 1} chananel <- s } func main() { //數據庫初始化 cfg, err := config.ParseConfigFile(*configFile) if err != nil { log.Fatalf("parse config file error:%v\n", err.Error()) return } //初始化數據庫 err = global.InitMysql(cfg.Mysqls) if err != nil { //數據庫鏈接錯誤 global.GlobalLogger.Error("InitDb error:%v\n", err.Error()) return } http.HandleFunc("/o", hu) http.HandleFunc("/ws", handleConnections) go handleMessages() err = http.ListenAndServe(cfg.Wesocketport, nil) if err != nil { log.Fatal(err.Error()) } } func handleConnections(w http.ResponseWriter, r *http.Request) { //若是限制鏈接就可使用ip+port限制,根據ip區分客戶端,其餘的能夠根據r.Request提交的數據查找相應的內容 siteId := r.FormValue("site_id") siteIndexId := r.FormValue("site_index_id")//這裏是用來惟一區分客戶端的判斷條件 if siteId == "" || siteIndexId == "" { http.Error(w, "site_id and site_index_id must not empty", 403) } //註冊成爲websocket ws, err := upgrader.Upgrade(w, r, nil) if err != nil { global.GlobalLogger.Error("error:%s", err.Error()) return } defer ws.Close() //存儲鏈接[todo 這裏可能還要考慮map併發讀寫問題] agent := make(map[string]*websocket.Conn) agent[s] = ws agentSlice = append(agentSlice, agent) clients[ws] = true //監聽接收一個[models/schema]schema.Listening, for { var msg Message s := <-chananel if s.Types == 1 { //todo 這裏解析取出來的數據可能還須要加工 //獲取最新的沒有確認得公司入款 newincome := new(function.MemberCompanyIncomeBean) info, count, err := newincome.GetNotConfirm(s.SiteId, s.SiteIndexId) if err != nil { global.GlobalLogger.Error("error:%s", err.Error()) return } msg = Message{SiteIndexId: s.SiteIndexId, SiteId: s.SiteId, Message: info, Count: count} } else if s.Types == 2 { //獲取最新的線上入款 onLineBean := new(function.OnlineEntryRecordBean) info, count, err := onLineBean.GetNotConfirm(s.SiteId, s.SiteIndexId) if err != nil { global.GlobalLogger.Error("error:%s", err.Error()) return } msg = Message{SiteIndexId: s.SiteIndexId, SiteId: s.SiteId, Message: info, Count: count} } else { //獲取沒有確認得最新的出款管理 makeMoney := new(function.MakeMoneyBean) info, count, err := makeMoney.GetOperateRecord(s.SiteId, s.SiteIndexId) if err != nil { global.GlobalLogger.Error("error:%s", err.Error()) return } msg = Message{SiteIndexId: s.SiteIndexId, SiteId: s.SiteId, Count: count, Message: info} } broadcast <- msg } } //單點推送 func handleMessages() { for { msg := <-broadcast var pushClient []*websocket.Conn newS := fmt.Sprintf("%s%s", msg.SiteId, msg.SiteIndexId) lenAgent := len(agentSlice) for i := 0; i < lenAgent; i++ { for k, v := range agentSlice[i] { if newS == k { pushClient = append(pushClient, v) } } } for i := 0; i < len(pushClient); i++ { for client := range clients { if pushClient[i] == client { err := client.WriteJSON(msg) if err != nil { global.GlobalLogger.Error("error:%s", err.Error()) client.Close() delete(clients, client) } } } } } }