golang的多協程實踐

go語言以優異的併發特性而聞名,恰好手上有個小項目比較適合。mysql

項目背景:git

公司播控平臺的數據存儲包括MySQL和ElasticSearch(ES)兩個部分,編輯、運營的數據首先保存在MySQL中,爲了實現模糊搜索和產品關聯推薦,特別增長了ES,ES中保存的是節目集的基本信息。程序員

本項目是爲了防止實時同步數據出現問題或者系統從新初始化時的全量數據同步而作的。項目主要是從MySQL讀取全部的節目集數據寫入到ES中。github

項目特色:golang

由於節目集數量較大,不能一次性的讀入內存,所以每次讀出一部分記錄寫入ES。ORM使用的是beego。爲了提升性能使用了協程,其中讀MySQL的部分最大開啓20個協程,ES寫入部分開啓了15個協程(由於ES分片設置的問題,5個協程和15個協程性能映像不大)。sql

項目主要包括三個文件:json

一、PrdES_v3.go,項目的入口,負責協調MySQL讀取和ES寫入。api

package main

import (
    "./db"
    "./es"
    //"encoding/json"
    "fmt"
    "reflect"
    "time"
)

type PrdES struct {
    DB *prd.Mysql
    ES *es.Elastic
}

// func (this *PrdES) Handle(result []*prd.Series) {
//     // for _, value := range result {
//     //     this.DB.FormatData(value)
//     //     //json, _ := json.Marshal(value)
//     //     //fmt.Println(string(json))
//     // }
//     //寫入ES,以多線程的方式執行,最多保持5個線程
//     this.ES.DoBulk(result)
// }
func (this *PrdES) Run() {
    count := 50
    offset := 0
    maxCount := 20
    //create channel
    chs := make([]chan []*prd.Series, maxCount)
    selectCase := make([]reflect.SelectCase, maxCount)
    for i := 0; i < maxCount; i++ {
        offset = count * i
        fmt.Println("offset:", offset)
        //init channel
        chs[i] = make(chan []*prd.Series)
        //set select case
        selectCase[i].Dir = reflect.SelectRecv
        selectCase[i].Chan = reflect.ValueOf(chs[i])
        //運行
        go this.DB.GetData(offset, count, chs[i])
    }
    var result []*prd.Series
    for {
        //wait data return
        chosen, recv, ok := reflect.Select(selectCase)
        if ok {
            fmt.Println("channel id:", chosen)
            result = recv.Interface().([]*prd.Series)

            //讀取數據從mysql
            go this.DB.GetData(offset, count, chs[chosen])

            //寫入ES,以多線程的方式執行,最多保持15個線程
            this.ES.DoBulk(result)
            //update offset
            offset = offset + len(result)
            //判斷是否到達數據尾部,最後一次查詢
            if len(result) < count {
                fmt.Println("read end of DB")
                //等全部的任務執行完畢
                this.ES.Over()
                fmt.Println("MySQL Total:", this.DB.GetTotal(), ",Elastic Total:", this.ES.GetTotal())
                return

            }
        }
    }

}

func main() {
    s := time.Now()
    fmt.Println("start")
    pe := new(PrdES)

    pe.DB = prd.NewDB()
    pe.ES = es.NewES()
    //fmt.Println("mysql info:")
    //fmt.Println("ES info:")
    pe.Run()

    fmt.Println("time out:", time.Since(s).Seconds(), "(s)")
    fmt.Println("Over!")

}

 在run函數裏能夠看到使用了reflect.SelectCase,使用reflect.SelectCase的緣由是讀MySQL數據是多個協程,不可預計哪一個會首先返回,selectCase是任何一個處理完畢reflect.Select函數就會返回,MySQL讀取的數據放在channel中宕Select函數返回時chosen, recv, ok := reflect.Select(selectCase)判斷ok是否未true            chosen表明的是協程id經過result = recv.Interface().([]*prd.Series)得到返回的數據,由於MySQL讀取的數據是對象的結果集,因次使用recv.Interface函數,若是是簡單類型能夠使用recv.recvInt(),recv.recvString()等函數直接獲取channel返回數據。 數組

這裏經過counter控制協程的數量,也能夠經過channel,用select的方式控制協程的數量,之因此用counter計數器的方式控制協程數量是我想知道同時有多少協程在運行。多線程

注:此處channel能夠不用建立數組形式,channel帶回來的數據也沒有順序問題。

二、es.go,負責寫入ES和es的寫入協程調度

package es

import (
    "../db"
    //"encoding/json"
    "fmt"
    elastigo "github.com/mattbaird/elastigo/lib"
    //elastigo "github.com/Uncodin/elastigo/lib"
    //"github.com/Uncodin/elastigo/core"
    "time"
    //"bytes"
    "flag"
    "sync"
    //"github.com/fatih/structs"
)

var (
    //開發測試庫
    //host = flag.String("host", "192.168.1.236", "Elasticsearch Host")
    //C平臺線上
    host = flag.String("host", "192.168.100.23", "Elasticsearch Host")
    port = flag.String("port", "9200", "Elasticsearch port")
)

//indexor := core.NewBulkIndexorErrors(10, 60)
// func init() {
//     //connect to elasticsearch
//     fmt.Println("connecting  es")
//     //api.Domain = *host //"192.168.1.236"
//     //api.Port = "9300"

// }
//save thread count
var counter int

type Elastic struct {
    //Seq int64
    c         *elastigo.Conn
    lock      *sync.Mutex
    lockTotal *sync.Mutex
    wg        *sync.WaitGroup
    total     int64
}

func (this *Elastic) Conn() {
    this.c = elastigo.NewConn()
    this.c.Domain = *host
    this.c.Port = *port
    //NewClient(fmt.Sprintf("%s:%d", *host, *port))
}
func (this *Elastic) CreateLock() {
    this.lock = &sync.Mutex{}
    this.lockTotal = &sync.Mutex{}
    this.wg = &sync.WaitGroup{}
    counter = 0
    this.total = 0
}
func NewES() (es *Elastic) {
    //connect elastic
    es = new(Elastic)
    es.Conn()
    //create lock
    es.CreateLock()
    return es
}
func (this *Elastic) DoBulk(series []*prd.Series) {
    for true {
        this.lock.Lock()
        if counter < 25 {
            //跳出,執行任務
            break
        } else {
            this.lock.Unlock()
            //等待100毫秒
            //fmt.Println("wait counter less than 25, counter:", counter)
            time.Sleep(1e8)
        }
    }
    this.lock.Unlock()
    //執行任務
    go this.bulk(series, this.lock)
}
func (this *Elastic) Over() {
    this.wg.Wait()
    /*for {
        this.lock.Lock()
        if counter <= 0 {
            this.lock.Unlock()
            break
        }
        this.lock.Unlock()
    }
    */
}

func (this *Elastic) GetTotal() (t int64) {
    this.lockTotal.Lock()
    t = this.total
    this.lockTotal.Unlock()
    return t
}
func (this *Elastic) bulk(series []*prd.Series, lock *sync.Mutex) (succCount int64) {
    //增長計數器
    this.wg.Add(1)
    //減小計數器
    defer this.wg.Done()

    //加計數器
    lock.Lock()
    counter++
    fmt.Println("add task, coutner:", counter)
    lock.Unlock()

    //設置初始成功寫入的數量
    succCount = 0

    for _, value := range series {
        //json, _ := json.Marshal(value)
        //fmt.Println(string(json))
        if value.ServiceGroup != nil {
            fmt.Println("series code:", value.Code, ",ServiceGroup:", value.ServiceGroup)

            resp, err := this.c.Index("guttv", "series", value.Code, nil, *value)

            if err != nil {
                panic(err)
            } else {
                //fmt.Println(value.Code + " write to ES succsessful!")
                fmt.Println(resp)
                succCount++
            }
        } else {
            fmt.Println("series code:", value.Code, "service group is null")
        }
    }

    //計數器減一
    lock.Lock()
    counter--
    fmt.Println("reduce task, coutner:", counter, ",success count:", succCount)
    lock.Unlock()

    this.lockTotal.Lock()
    this.total = this.total + succCount
    this.lockTotal.Unlock()
    return succCount
}

 在es.go裏有兩把鎖lock和lockTotal,前者是針對counter變量,記錄es正在寫入的協程數量的;後者爲記錄總共寫入多少條記錄到es而增長的。

這裏必需要提到的是Over函數,初步使用協程的容易忽略。golang的原則是當main函數運行結束後,全部正在運行的協程都會終止,因襲在MySQL讀取數據完畢必須調用Over函數,等待全部的協程結束。這裏使用sync.waiGrooup。每次協程啓動執行下面兩個語句:

//增長計數器
this.wg.Add(1)
//減小計數器,函數結束時自動執行
defer this.wg.Done()
Over函數中調用
wg.Wait()等待計數器爲0時返回,不然就一直阻塞。固然讀者也能夠看到經過檢查counter是否小於等於0也能夠判斷協程是否都結束(Over函數被註釋的部分),顯然使用waitGroup更優雅和高效。

 

三、db.go,負責MySQL數據的讀取

package prd

import (
    "fmt"
    "github.com/astaxie/beego/orm"
    _ "github.com/go-sql-driver/mysql" // import your used driver
    "strings"
    "sync"
    "time"
)

func init() {
  
    orm.RegisterDataBase("default", "mysql", "@tcp(192.168.100.3306)/guttv_vod?charset=utf8", 30)

    orm.RegisterModelWithPrefix("t_", new(Series), new(Product), new(ServiceGroup))
    orm.RunSyncdb("default", false, false)
}

type Mysql struct {
    sql   string
    total int64
    lock  *sync.Mutex
}

func (this *Mysql) New() {
    //this.sql = "SELECT s.*, p.code ProductCode, p.name pName  FROM guttv_vod.t_series s inner join guttv_vod.t_product p on p.itemcode=s.code  and p.isdelete=0 limit ?,?"
    this.sql = "SELECT s.*, p.code ProductCode, p.name pName  FROM guttv_vod.t_series s , guttv_vod.t_product p where p.itemcode=s.code  and p.isdelete=0 limit ?,?"
    this.total = 0
    this.lock = &sync.Mutex{}
}
func NewDB() (db *Mysql) {
    db = new(Mysql)
    db.New()
    return db
}
func (this *Mysql) GetTotal() (t int64) {
    t = 0
    this.lock.Lock()
    t = this.total
    this.lock.Unlock()
    return t
}
func (this *Mysql) toTime(toBeCharge string) int64 {
    timeLayout := "2006-01-02 15:04:05"
    loc, _ := time.LoadLocation("Local")
    theTime, _ := time.ParseInLocation(timeLayout, toBeCharge, loc)
    sr := theTime.Unix()
    if sr < 0 {
        sr = 0
    }
    return sr
}
func (this *Mysql) getSGCode(seriesCode string) (result []string, num int64) {
    sql := "select distinct ref.servicegroupcode code  from t_servicegroup_reference_category ref "
    sql = sql + "left join t_category_product cp on cp.categorycode=ref.categorycode "
    sql = sql + "left join t_package pkg on pkg.code = cp.assetcode "
    sql = sql + "left join t_package_product pp on pp.parentcode=pkg.code "
    sql = sql + "left join t_product prd on prd.code = pp.assetcode "
    sql = sql + "where   prd.itemcode=?"
    o := orm.NewOrm()
    var sg []*ServiceGroup
    num, err := o.Raw(sql, seriesCode).QueryRows(&sg)

    if err == nil {
        //fmt.Println(num)
        for _, value := range sg {
            //fmt.Println(value.Code)
            result = append(result, value.Code)
        }

    } else {
        fmt.Println(err)
    }
    //fmt.Println(result)
    return result, num
}

func (this *Mysql) formatData(value *Series) {
    //設置業務分組數據
    sg, _ := this.getSGCode(value.Code)
    //fmt.Println(sg)
    value.ServiceGroup = []string{}
    value.ServiceGroup = sg[0:]
    //更改OnlineTime爲整數
    value.OnlineTimeInt = this.toTime(value.OnlineTime)
    //分解地區
    value.OriginalCountryArr = strings.Split(value.OriginalCountry, "|")
    //分解二級分類
    value.ProgramType2Arr = strings.Split(value.ProgramType2, "|")
    //寫入記錄內容
    value.Description = strings.Replace(value.Description, "\n", "", -1)
}
func (this *Mysql) GetData(offset int, size int, ch chan []*Series) {
    var result []*Series
    o := orm.NewOrm()
    num, err := o.Raw(this.sql, offset, size).
        QueryRows(&result)
    if err != nil {
        fmt.Println("read DB err")
        panic(err)
        //return //err, nil
    }
    for _, value := range result {
        this.formatData(value)
        //json, _ := json.Marshal(value)
        //fmt.Println(string(json))
        //fmt.Println(value.ServiceGroup)
    }
    this.lock.Lock()
    this.total += num
    this.lock.Unlock()

    fmt.Println("read count :", num) //, "Total:", Total)
    //return nil, result
    ch <- result
}

 從項目上看。go語言開發仍是比較簡潔的,多協程實現也相對容易,但要求開發者必須對概念很是清晰,像select和selectCase理解和defer的理解要很到位,我的層經作過多年的C/C++程序員,從經驗上看,C/C++的經驗(多線程的理解)對運用go語言仍是頗有幫助的。

相關文章
相關標籤/搜索