nsq 問題

在nsq 進行topic get的時候都會對map進行枷鎖app

func (n *NSQD) GetTopic(topicName string) *Topic {
    n.Lock()
    t, ok := n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    } else {
        t = NewTopic(topicName, &Context{n})
        n.topicMap[topicName] = t

        log.Printf("TOPIC(%s): created", t.name)

        // release our global nsqd lock, and switch to a more granular topic lock while we init our
        // channels from lookupd. This blocks concurrent PutMessages to this topic.
        t.Lock()
        n.Unlock()
        // if using lookupd, make a blocking call to get the topics, and immediately create them.
        // this makes sure that any message received is buffered to the right channels
        if len(n.lookupPeers) > 0 {
            channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, n.lookupHttpAddrs())
            for _, channelName := range channelNames {
                t.getOrCreateChannel(channelName)
            }
        }
        t.Unlock()

        // NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
        // casing the code above so that we can control the locks such that it is impossible
        // for a message to be written to a (new) topic while we're looking up channels
        // from lookupd...
        //
        // update messagePump state
        select {
        case t.channelUpdateChan <- 1:
        case <-t.exitChan:
        }
    }
    return t
}
相關文章
相關標籤/搜索