在nsq 進行topic get的時候都會對map進行枷鎖app
func (n *NSQD) GetTopic(topicName string) *Topic {
n.Lock()
t, ok := n.topicMap[topicName]
if ok {
n.Unlock()
return t
} else {
t = NewTopic(topicName, &Context{n})
n.topicMap[topicName] = t
log.Printf("TOPIC(%s): created", t.name)
// release our global nsqd lock, and switch to a more granular topic lock while we init our
// channels from lookupd. This blocks concurrent PutMessages to this topic.
t.Lock()
n.Unlock()
// if using lookupd, make a blocking call to get the topics, and immediately create them.
// this makes sure that any message received is buffered to the right channels
if len(n.lookupPeers) > 0 {
channelNames, _ := lookupd.GetLookupdTopicChannels(t.name, n.lookupHttpAddrs())
for _, channelName := range channelNames {
t.getOrCreateChannel(channelName)
}
}
t.Unlock()
// NOTE: I would prefer for this to only happen in topic.GetChannel() but we're special
// casing the code above so that we can control the locks such that it is impossible
// for a message to be written to a (new) topic while we're looking up channels
// from lookupd...
//
// update messagePump state
select {
case t.channelUpdateChan <- 1:
case <-t.exitChan:
}
}
return t
}