NSQ簡單食用

生產者

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	cfg := nsq.NewConfig()
	prod, err := nsq.NewProducer("127.0.0.1:4150", cfg)
	if err != nil {
		log.Fatal(err)
	}

	for {
		var data string
		fmt.Print("input message:")
		fmt.Scan(&data)
		topic := "hello"
		err = prod.Publish(topic, []byte(data))
		if err != nil {
			log.Fatal(err)
		}
	}

	select {}
}
複製代碼

消費者

package main

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

type MyHandler struct {
	Title string
}

func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
	fmt.Println(m.Title)
	fmt.Println(msg.NSQDAddress)
	fmt.Println(string(msg.Body))
	return
}

func main() {
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = 15 * time.Second
	topic := "hello"
	channel := "frist"
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		log.Fatal(err)
	}

	consumer := &MyHandler{
		Title: "wanghaha",
	}

	c.AddHandler(consumer)

	if err := c.ConnectToNSQLookupd("127.0.0.1:4161"); err != nil {
		log.Fatal(err)
	}

	s := make(chan os.Signal)
	signal.Notify(s, syscall.SIGINT)
	<- s
}
複製代碼
相關文章
相關標籤/搜索