nsqd 源碼,寫入數據

切記: chanel 有有本身的持久化 隊列app

topic 也有本身的持久化隊列,兩個是相互獨立oop

給一個topic put 數據的:this

// PutMessage writes to the appropriate incoming message channel

func (t *Topic) PutMessage(msg *nsq.Message) error {

    t.RLock()

    defer t.RUnlock()

    if atomic.LoadInt32(&t.exitFlag) == 1 {

        return errors.New("exiting")

    }

    t.incomingMsgChan <- msg

    atomic.AddUint64(&t.messageCount, 1)

    return nil

}


incomingMsgChan  是:incomingMsgChan:   make(chan *nsq.Message, 1),
        memoryMsgChan:     make(chan *nsq.Message, context.nsqd.options.MemQueueSize),
        
 初始化topic 會執行:
 
 t.waitGroup.Wrap(func() { t.router() })
 t.waitGroup.Wrap(func() { t.messagePump() })
 
 func (t *Topic) router() {
    var msgBuf bytes.Buffer
    for msg := range t.incomingMsgChan { //當有數據的時候執行
        select {
        case t.memoryMsgChan <- msg://memoryMsgChan 能夠寫入
        default:
            err := WriteMessageToBackend(&msgBuf, msg, t.backend)// 默認持久化到硬盤
            if err != nil {
                log.Printf("ERROR: failed to write message to backend - %s", err.Error())
                // theres not really much we can do at this point, you're certainly
                // going to lose messages...
            }
        }
    }

    log.Printf("TOPIC(%s): closing ... router", t.name)
}
 
 WriteMessageToBackend  會調用:
 func (d *DiskQueue) Put(data []byte) error {
    d.RLock()
    defer d.RUnlock()

    if d.exitFlag == 1 {
        return errors.New("exiting")
    }

    d.writeChan <- data 
    return <-d.writeResponseChan
}
 
 初始化一個:NewDiskQueue 會定時執行:
 func (d *DiskQueue) ioLoop() {
    var dataRead []byte
    var err error
    var count int64
    var r chan []byte

    syncTicker := time.NewTicker(d.syncTimeout)

    for {
        count++
        // dont sync all the time :)
        if count == d.syncEvery {
            count = 0
            d.needSync = true
        }

        if d.needSync {
            err = d.sync()
            if err != nil {
                log.Printf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err.Error())
            }
        }

        if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
            if d.nextReadPos == d.readPos {
                dataRead, err = d.readOne()
                if err != nil {
                    log.Printf("ERROR: reading from diskqueue(%s) at %d of %s - %s",
                        d.name, d.readPos, d.fileName(d.readFileNum), err.Error())
                    d.handleReadError()
                    continue
                }
            }
            r = d.readChan
        } else {
            r = nil
        }

        select {
        // the Go channel spec dictates that nil channel operations (read or write)
        // in a select are skipped, we set r to d.readChan only when there is data to read
        case r <- dataRead:
            d.moveForward()
        case <-d.emptyChan:
            d.emptyResponseChan <- d.deleteAllFiles()
        case dataWrite := <-d.writeChan:  //writeChan   剛剛寫入數據的chan
            d.writeResponseChan <- d.writeOne(dataWrite)
        case <-syncTicker.C:
            d.needSync = true
        case <-d.exitChan:
            goto exit
        }
    }

exit:
    log.Printf("DISKQUEUE(%s): closing ... ioLoop", d.name)
    syncTicker.Stop()
    d.exitSyncChan <- 1
}
相關文章
相關標籤/搜索