Kai - Golang實現的目標檢測雲服務

YOLO/Darknet是目前比較流行的Object Detection算法(後面統一稱爲Darknet),在GPU上的表現不但速度快並且準確率很高。可是使用起來不方便,只提供了命令行接口和簡單的Python接口。因此我想用RESTful來實現一個雲端的Darknet服務kailinux

選擇用Go的緣由不是考慮併發,而是goroutine之間的同步能方便的處理,適合實現Pipeline的功能。問題來了,Darknet是c語言實現的,那Go必須得用cgo進行封裝,才能調用c函數。目標是爲了實現三個基本功能:1. 圖片檢測 2. 視頻檢測 3. 攝像頭檢測。爲了方便使用我修改了Darknet的部分代碼,而後從新定義下面幾個函數:git

// Set a gpu device
void set_gpu(int gpu);

// Recognize a image
void image_detector(char *datacfg, char *cfgfile, char *weightfile, char *filename,
    float thresh, float hier_thresh, char *outfile);

// Recognize a video
void video_detector(char *datacfg, char *cfgfile, char *weightfile, char *filename,
    float thresh, float hier_thresh, char *outfile);

// Recognize a camera stream
void camera_detector(char *datacfg, char *cfgfile, char *weightfile, int camindex,
        float thresh, float hier_thresh, char *outpath);

有了這幾個函數,就好辦了,下面用cgo導入相應的庫和頭文件便可:github

// #cgo pkg-config: opencv
// #cgo linux LDFLAGS: -ldarknet -lm -L/usr/local/cuda/lib64 -lcuda -lcudart -lcublas -lcurand -lcudnn
// #cgo darwin LDFLAGS: -ldarknet
// #include "yolo.h"
import "C"

// SetGPU set a gpu device you want
func SetGPU(gpu int) {
    C.set_gpu(C.int(gpu))
}

// ImageDetector recognize a image
func ImageDetector(dc, cf, wf, fn string, t, ht float64, of ...string) {
    ...
}

// VideoDetector recognize a video
func VideoDetector(dc, cf, wf, fn string, t, ht float64, of ...string) {
    ...
}

// CameraDetector recognize a camera stream
func CameraDetector(dc, cf, wf string, i int, t, ht float64, of ...string) {
    ...
}

這樣對Darknet的封裝go-yolo就完成了。算法

下面進入主題,介紹一下kai的實現。後端

kai的設計目標以下:架構

  • 後端基於Darknet(不支持訓練)
  • 提供RESTful接口進行圖片和視頻的檢測
  • 支持Amazon S3下載和上傳
  • 支持Ftp下載和上傳
  • 支持檢測結果持久化到MongoDB

架構圖是這樣的
Kai併發

這裏重點介紹一下Kai的Pipeline機制,這裏的Pipeline包括下載(Download),檢測(Yolo)和上(Upload)傳這一系列流程。
先上個圖:
Pipelineide

這裏的難點在於下載(Download),檢測(Yolo)和上傳(Upload)這三個步驟能夠配置不一樣的Goroutine數量,而這三步之間是一個同步操做。函數

  • 首先須要定義3個buffered channel來進行同步
// KaiServer represents the server for processing all job requests
type KaiServer struct {
    net.Listener
    logger        *logging.Logger
    config        types.ServerConfig
    listenAddr    string
    listenNetwork string
    router        *Router
    server        *http.Server
    db            db.Storage
    // jobDownBuff is the buffered channel for job downloading
    jobDownBuff chan types.Job
    // jobDownBuff is the buffered channel for job todo
    jobTodoBuff chan types.Job
    // jobDownBuff is the buffered channel for job done
    jobDoneBuff chan types.Job
}
  • Pipeline的執行流程以下
// Pipeline contains downloading, processing and uploading a job
func Pipeline(logger *logging.Logger, config types.ServerConfig, dbInstance db.Storage, jobDownBuff chan types.Job,
    jobTodoBuff chan types.Job, jobDoneBuff chan types.Job, job types.Job) {
    logger.Infof("pipeline-job %+v", job)

    // download a job
    setupAndDownloadJob(logger, config.System, dbInstance, job, jobDownBuff)

    // jobDownBuff -> jobTodoBuff -> jobDoneBuff
    yoloJob(logger, config, dbInstance, jobDownBuff, jobTodoBuff, jobDoneBuff)

    // upload a job
    uploadJob(logger, dbInstance, jobDoneBuff)
}
  • 下載(Download)
// setupAndDownloadJob setup and download jobs into jobDownBuff
func setupAndDownloadJob(logger *logging.Logger, config types.SystemConfig,
    dbInstance db.Storage, job types.Job, jobDownBuff chan<- types.Job) {

    go func() {
        logger.Infof("start setup and download a job: %+v", job)
        newJob, err := SetupJob(logger, job.ID, dbInstance, config)
        job = *newJob
        if err != nil {
            logger.Error("setup-job failed", err)
            return
        }

        downloadFunc := downloaders.GetDownloadFunc(job.Source)
        if err := downloadFunc(logger, config, dbInstance, job.ID); err != nil {
            logger.Error("download failed", err)
            job.Status = types.JobError
            job.Details = err.Error()
            dbInstance.UpdateJob(job.ID, job)
            return
        }

        jobDownBuff <- job
    }()
}
  • 檢測(Yolo)
func yoloJob(logger *logging.Logger, config types.ServerConfig, dbInstance db.Storage,
    jobDownBuff <-chan types.Job, jobTodoBuff chan types.Job, jobDoneBuff chan types.Job) {

    go func() {
        job, ok := <-jobDownBuff
        if !ok {
            logger.Info("job download buffer is closed")
            return
        }
        logger.Infof("start a yolo job: %+v", job)
        // limit the number of job in the jobTodoBuff
        jobTodoBuff <- job
        jobTodo, ok := <-jobTodoBuff
        if !ok {
            logger.Info("job todo buffer is closed")
            return
        }

        nGpu := config.System.NGpu
        t := yolo.NewTask(config.Yolo, jobTodo.Media.Cate, nGpu, jobTodo.LocalSource, jobTodo.LocalDestination)
        logger.Debugf("yolo task: %+v", *t)
        yolo.StartTask(t, logger, dbInstance, jobTodo.ID)
        jobDoneBuff <- job
    }()
}
  • 上傳(Upload)
func uploadJob(logger *logging.Logger, dbInstance db.Storage, jobDoneBuff <-chan types.Job) {
    go func() {
        jobDone, ok := <-jobDoneBuff
        if !ok {
            logger.Info("job done buffer is closed")
            return
        }
        logger.Infof("start a upload job: %+v", jobDone)

        uploadFunc := uploaders.GetUploadFunc(jobDone.Destination)
        if err := uploadFunc(logger, dbInstance, jobDone.ID); err != nil {
            logger.Error("upload failed", err)
            jobDone.Status = types.JobError
            jobDone.Details = err.Error()
            dbInstance.UpdateJob(jobDone.ID, jobDone)
            return
        }

        logger.Info("erasing temporary files")
        if err := CleanSwap(dbInstance, jobDone.ID); err != nil {
            logger.Error("erasing temporary files failed", err)
        }

        jobDone.Status = types.JobFinished
        dbInstance.UpdateJob(jobDone.ID, jobDone)

        logger.Infof("end a job: %+v", jobDone)
    }()
}

到此,這個項目主要機制都已經介紹完了,若是你們有興趣的能夠去點擊下面的項目主頁。spa

項目連接:
go-yolo: https://github.com/ZanLabs/go...
kai: https://github.com/ZanLabs/kai

相關文章
相關標籤/搜索