生產者
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
)
func main() {
cfg := nsq.NewConfig()
prod, err := nsq.NewProducer("127.0.0.1:4150", cfg)
if err != nil {
log.Fatal(err)
}
for {
var data string
fmt.Print("input message:")
fmt.Scan(&data)
topic := "hello"
err = prod.Publish(topic, []byte(data))
if err != nil {
log.Fatal(err)
}
}
select {}
}
複製代碼
消費者
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
)
type MyHandler struct {
Title string
}
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Println(m.Title)
fmt.Println(msg.NSQDAddress)
fmt.Println(string(msg.Body))
return
}
func main() {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = 15 * time.Second
topic := "hello"
channel := "frist"
c, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
log.Fatal(err)
}
consumer := &MyHandler{
Title: "wanghaha",
}
c.AddHandler(consumer)
if err := c.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
log.Fatal(err)
}
s := make(chan os.Signal)
signal.Notify(s, syscall.SIGINT)
<- s
}
複製代碼