切記: chanel 有有本身的持久化 隊列app
topic 也有本身的持久化隊列,兩個是相互獨立oop
給一個topic put 數據的:this
// PutMessage writes to the appropriate incoming message channel
func (t *Topic) PutMessage(msg *nsq.Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
t.incomingMsgChan <- msg
atomic.AddUint64(&t.messageCount, 1)
return nil
}
incomingMsgChan 是:incomingMsgChan: make(chan *nsq.Message, 1), memoryMsgChan: make(chan *nsq.Message, context.nsqd.options.MemQueueSize), 初始化topic 會執行: t.waitGroup.Wrap(func() { t.router() }) t.waitGroup.Wrap(func() { t.messagePump() }) func (t *Topic) router() { var msgBuf bytes.Buffer for msg := range t.incomingMsgChan { //當有數據的時候執行 select { case t.memoryMsgChan <- msg://memoryMsgChan 能夠寫入 default: err := WriteMessageToBackend(&msgBuf, msg, t.backend)// 默認持久化到硬盤 if err != nil { log.Printf("ERROR: failed to write message to backend - %s", err.Error()) // theres not really much we can do at this point, you're certainly // going to lose messages... } } } log.Printf("TOPIC(%s): closing ... router", t.name) } WriteMessageToBackend 會調用: func (d *DiskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan } 初始化一個:NewDiskQueue 會定時執行: func (d *DiskQueue) ioLoop() { var dataRead []byte var err error var count int64 var r chan []byte syncTicker := time.NewTicker(d.syncTimeout) for { count++ // dont sync all the time :) if count == d.syncEvery { count = 0 d.needSync = true } if d.needSync { err = d.sync() if err != nil { log.Printf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err.Error()) } } if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { log.Printf("ERROR: reading from diskqueue(%s) at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err.Error()) d.handleReadError() continue } } r = d.readChan } else { r = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read case r <- dataRead: d.moveForward() case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() case dataWrite := <-d.writeChan: //writeChan 剛剛寫入數據的chan d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: d.needSync = true case <-d.exitChan: goto exit } } exit: log.Printf("DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1 }